diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..a557198 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,50 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal" + }, + { + "name": "tap-xero DISCOVER", + "type": "debugpy", + "request": "launch", + "program": "../tap_xero/__init__.py", + "cwd": "/Users/renanbutkeraites/dev/tap-xero/.secrets", + "args": [ + "--config", + "config.json", + "--discover", + ">", + "catalog.json", + ], + "python": "/Users/renanbutkeraites/dev/tap-xero/venv/bin/python3", + "console": "integratedTerminal", + "justMyCode": false + }, + { + "name": "tap-xero GET", + "type": "debugpy", + "request": "launch", + "program": "../tap_xero/__init__.py", + "cwd": "/Users/renanbutkeraites/dev/tap-xero/.secrets", + "args": [ + "--config", + "config.json", + "--catalog", + "catalog-selected.json", + ">", + "data.txt", + ], + "python": "/Users/renanbutkeraites/dev/tap-xero/venv/bin/python3", + "console": "integratedTerminal", + "justMyCode": false + } + ] +} \ No newline at end of file diff --git a/tap_xero/__init__.py b/tap_xero/__init__.py index f0bf2af..937295c 100644 --- a/tap_xero/__init__.py +++ b/tap_xero/__init__.py @@ -93,7 +93,7 @@ def load_and_write_schema(stream): ) -def sync(ctx): +def sync(ctx:Context): ctx.refresh_credentials() currently_syncing = ctx.state.get("currently_syncing") start_idx = streams_.all_stream_ids.index(currently_syncing) \ diff --git a/tap_xero/context.py b/tap_xero/context.py index fabf002..d771f9d 100644 --- a/tap_xero/context.py +++ b/tap_xero/context.py @@ -33,10 +33,10 @@ def set_offset(self, path, val): def clear_offsets(self, tap_stream_id): bks_.clear_offset(self.state, tap_stream_id) - def update_start_date_bookmark(self, path): + def update_start_date_bookmark(self, path, start_date=None): val = self.get_bookmark(path) if not val: - val = self.config["start_date"] + val = start_date or self.config["start_date"] self.set_bookmark(path, val) return val diff --git a/tap_xero/streams.py b/tap_xero/streams.py index 49ead58..6b74f8b 100644 --- a/tap_xero/streams.py +++ b/tap_xero/streams.py @@ -1,10 +1,12 @@ from requests.exceptions import HTTPError import singer import logging -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from singer import metadata, metrics, Transformer from singer.utils import strptime_with_tz import backoff + +from tap_xero.context import Context from . import transform from dateutil.parser import parse from dateutil.relativedelta import relativedelta @@ -63,12 +65,12 @@ def _make_request(ctx, tap_stream_id, filter_options=None, attempts=0): class Stream(): - def __init__(self, tap_stream_id, pk_fields, bookmark_key="UpdatedDateUTC", format_fn=None): + def __init__(self, tap_stream_id, pk_fields, bookmark_key="UpdatedDateUTC", format_fn=None, replication_method="INCREMENTAL"): self.tap_stream_id = tap_stream_id self.pk_fields = pk_fields self.format_fn = format_fn or (lambda x: x) self.bookmark_key = bookmark_key - self.replication_method = "INCREMENTAL" + self.replication_method = replication_method self.filter_options = {} def metrics(self, records): @@ -100,17 +102,20 @@ def sync(self, ctx): class ReportStream(Stream): - def sync(self, ctx): + def sync(self, ctx:Context): bookmark = [self.tap_stream_id, self.bookmark_key] - start = ctx.update_start_date_bookmark(bookmark) - + start_date = (datetime.now(timezone.utc) - relativedelta(months=12)).strftime("%Y-%m-%dT%H:%M:%SZ") if self.replication_method == "FULL_TABLE" else None + start = ctx.update_start_date_bookmark(bookmark, start_date=start_date) start_dt = parse(start) start_dt = datetime(start_dt.year, start_dt.month, 1) previous_budgets = set() break_loop = False while True: from_date = start_dt.strftime("%Y-%m-01") - to_date = start_dt + relativedelta(months=1) - timedelta(1) + if self.replication_method == "FULL_TABLE": + to_date = start_dt + relativedelta(months=12) - timedelta(1) + else: + to_date = start_dt + relativedelta(months=1) - timedelta(1) to_date = to_date.strftime("%Y-%m-%d") if self.tap_stream_id == "budgets": self.filter_options.update(dict(DateFrom=from_date, DateTo=to_date)) @@ -160,7 +165,7 @@ class PaginatedStream(Stream): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - def sync(self, ctx): + def sync(self, ctx:Context): bookmark = [self.tap_stream_id, self.bookmark_key] offset = [self.tap_stream_id, "page"] start = ctx.update_start_date_bookmark(bookmark) @@ -190,7 +195,7 @@ class Contacts(PaginatedStream): def __init__(self, *args, **kwargs): super().__init__("contacts", ["ContactID"], format_fn=transform.format_contacts, *args, **kwargs) - def sync(self, ctx): + def sync(self, ctx:Context): # Parameter to collect archived contacts from the Xero platform if ctx.config.get("include_archived_contacts") in ["true", True]: self.filter_options.update({'includeArchived': "true"}) @@ -202,7 +207,7 @@ class Journals(Stream): """The Journals endpoint is a special case. It has its own way of ordering and paging the data. See https://developer.xero.com/documentation/api/journals""" - def sync(self, ctx): + def sync(self, ctx:Context): bookmark = [self.tap_stream_id, self.bookmark_key] journal_number = ctx.get_bookmark(bookmark) or 0 while True: @@ -225,7 +230,7 @@ def sync(self, ctx): if not records or len(records) < FULL_PAGE_SIZE: break - def write_records(self, records, ctx): + def write_records(self, records, ctx:Context): """"Custom implementation from the write records method available in Stream class""" stream = ctx.catalog.get_stream(self.tap_stream_id) schema = stream.schema.to_dict() @@ -267,7 +272,7 @@ class LinkedTransactions(Stream): the UpdatedDateUTC timestamp in them. Therefore we must always iterate over all of the data, but we can manually omit records based on the UpdatedDateUTC property.""" - def sync(self, ctx): + def sync(self, ctx:Context): bookmark = [self.tap_stream_id, self.bookmark_key] offset = [self.tap_stream_id, "page"] start = ctx.update_start_date_bookmark(bookmark) @@ -297,7 +302,7 @@ def __init__(self, *args, **kwargs): self.bookmark_key = None self.replication_method = "FULL_TABLE" - def sync(self, ctx): + def sync(self, ctx:Context): records = _make_request(ctx, self.tap_stream_id) self.format_fn(records) self.write_records(records, ctx) @@ -354,6 +359,6 @@ def sync(self, ctx): ReportStream("reports_profit_and_loss", ["from_date"], bookmark_key="to_date"), ReportStream("reports_balance_sheet", ["from_date"], bookmark_key="to_date"), ReportStream("budgets", ["from_date"], bookmark_key="to_date"), - ReportStream("reports_bank_summary", ["from_date"], bookmark_key="to_date"), + ReportStream("reports_bank_summary", ["from_date"], bookmark_key="to_date", replication_method="FULL_TABLE"), ] all_stream_ids = [s.tap_stream_id for s in all_streams]