From 1bfaa1c8a73a9ee7866bd90ad0853f807093fb10 Mon Sep 17 00:00:00 2001 From: Jong Date: Wed, 15 Nov 2023 16:47:23 -0500 Subject: [PATCH] fixed incremental streaming --- poetry.lock | 14 +++++++------- tap_restaurant365/client.py | 18 ++++++++++-------- tap_restaurant365/streams.py | 6 ++++-- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/poetry.lock b/poetry.lock index 4dcce92..67f59e0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -45,17 +45,17 @@ files = [ [[package]] name = "boto3" -version = "1.29.0" +version = "1.29.1" description = "The AWS SDK for Python" optional = true python-versions = ">= 3.7" files = [ - {file = "boto3-1.29.0-py3-none-any.whl", hash = "sha256:91c72fa4848eda9311c273db667946bd9d953285ae8d54b7bbad541b74adc254"}, - {file = "boto3-1.29.0.tar.gz", hash = "sha256:3e90ea2faa3e9892b9140f857911f9ef0013192a106f50d0ec7b71e8d1afc90a"}, + {file = "boto3-1.29.1-py3-none-any.whl", hash = "sha256:192695305fa65012d21f78ee852b91cb56dd571e84d51fb71f756302bf19d23f"}, + {file = "boto3-1.29.1.tar.gz", hash = "sha256:20285ebf4e98b2905a88aeb162b4f77ff908b2e3e31038b3223e593789290aa3"}, ] [package.dependencies] -botocore = ">=1.32.0,<1.33.0" +botocore = ">=1.32.1,<1.33.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.7.0,<0.8.0" @@ -64,13 +64,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.32.0" +version = "1.32.1" description = "Low-level, data-driven core of boto 3." optional = true python-versions = ">= 3.7" files = [ - {file = "botocore-1.32.0-py3-none-any.whl", hash = "sha256:9c1e143feb6a04235cec342d2acb31a0f44df3c89f309f839e03e38a75f3f44e"}, - {file = "botocore-1.32.0.tar.gz", hash = "sha256:95fe3357b9ddc4559941dbea0f0a6b8fc043305f013b7ae2a85dff0c3b36ee92"}, + {file = "botocore-1.32.1-py3-none-any.whl", hash = "sha256:1d9c0ff3eb7828a8bd8c5c7f12cd9d8c05c6fe4c616ef963fdaab538a0da3809"}, + {file = "botocore-1.32.1.tar.gz", hash = "sha256:fcf3cc2913afba8e5f7ebcc15e8f6bfae844ab64bf983bf5a6fe3bb54cce239d"}, ] [package.dependencies] diff --git a/tap_restaurant365/client.py b/tap_restaurant365/client.py index 5ca9dfe..49ab7ea 100644 --- a/tap_restaurant365/client.py +++ b/tap_restaurant365/client.py @@ -65,6 +65,14 @@ def get_new_paginator(self) -> BaseAPIPaginator: return super().get_new_paginator() + def get_starting_time(self, context): + start_date = self.config.get("start_date") + if start_date: + start_date = parser.parse(self.config.get("start_date")) + rep_key = self.get_starting_timestamp(context) + return rep_key or start_date + + def get_url_params( @@ -84,14 +92,8 @@ def get_url_params( params: dict = {} if self.replication_key: - params["$filter"] = f"{self.replication_key} ge {self.config.get('start_date')}" - if self.name == "journal_entries": - #set a date in the stream to check later to see if we need to keep calling to the stream - params["$filter"] += f" and type eq 'Journal Entry' and {self.replication_key} lt {(parser.parse(self.config.get('start_date')) + timedelta(days=30)).isoformat()}" - # if self.name == "bills": - # #set a date in the stream to check later to see if we need to keep calling to the stream - # self.started_on = (parser.parse(self.config.get('start_date')) + timedelta(days=30)).isoformat() - # params["$filter"] += f" and type eq 'AP Invoices' and {self.replication_key} lt {self.started_on}" + start_date = self.get_starting_time(context).strftime('%Y-%m-%dT%H:%M:%SZ') + params["$filter"] = f"{self.replication_key} ge {start_date}" return params diff --git a/tap_restaurant365/streams.py b/tap_restaurant365/streams.py index 9c23dcc..dae32f0 100644 --- a/tap_restaurant365/streams.py +++ b/tap_restaurant365/streams.py @@ -78,8 +78,10 @@ def get_next_page_token( ) -> t.Optional[t.Any]: """Return a token for identifying next page or None if no more pages.""" if self.paginate == True: + # start_date = self.config.get("start_date") + start_date = parser.parse(self.tap_state["bookmarks"][self.name]['starting_replication_value']) or parser.parse(self.config.get("start_date")) today = datetime.today() - previous_token = previous_token or parser.parse(self.config.get("start_date")) + previous_token = previous_token or start_date next_token = (previous_token + timedelta(days=30)).replace(tzinfo=None) if (today - next_token).days < 30: @@ -98,7 +100,7 @@ def get_url_params( #x.strftime('%Y-%m-%dT%H:%M:%SZ') - start_date = next_page_token or parser.parse(self.config.get("start_date")) + start_date = next_page_token or self.get_starting_time(context) end_date = start_date + timedelta(days = 30) if self.replication_key: params["$filter"] = f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}"