Skip to content

Commit

Permalink
Included FULL_TABLE sync (1 year horizon) capacity for reports
Browse files Browse the repository at this point in the history
  • Loading branch information
butkeraites-hotglue committed Oct 24, 2024
1 parent 1c6b0b2 commit b9ccc55
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 17 deletions.
50 changes: 50 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "tap-xero DISCOVER",
"type": "debugpy",
"request": "launch",
"program": "../tap_xero/__init__.py",
"cwd": "/Users/renanbutkeraites/dev/tap-xero/.secrets",
"args": [
"--config",
"config.json",
"--discover",
">",
"catalog.json",
],
"python": "/Users/renanbutkeraites/dev/tap-xero/venv/bin/python3",
"console": "integratedTerminal",
"justMyCode": false
},
{
"name": "tap-xero GET",
"type": "debugpy",
"request": "launch",
"program": "../tap_xero/__init__.py",
"cwd": "/Users/renanbutkeraites/dev/tap-xero/.secrets",
"args": [
"--config",
"config.json",
"--catalog",
"catalog-selected.json",
">",
"data.txt",
],
"python": "/Users/renanbutkeraites/dev/tap-xero/venv/bin/python3",
"console": "integratedTerminal",
"justMyCode": false
}
]
}
2 changes: 1 addition & 1 deletion tap_xero/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def load_and_write_schema(stream):
)


def sync(ctx):
def sync(ctx:Context):
ctx.refresh_credentials()
currently_syncing = ctx.state.get("currently_syncing")
start_idx = streams_.all_stream_ids.index(currently_syncing) \
Expand Down
4 changes: 2 additions & 2 deletions tap_xero/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ def set_offset(self, path, val):
def clear_offsets(self, tap_stream_id):
bks_.clear_offset(self.state, tap_stream_id)

def update_start_date_bookmark(self, path):
def update_start_date_bookmark(self, path, start_date=None):
val = self.get_bookmark(path)
if not val:
val = self.config["start_date"]
val = start_date or self.config["start_date"]
self.set_bookmark(path, val)
return val

Expand Down
33 changes: 19 additions & 14 deletions tap_xero/streams.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from requests.exceptions import HTTPError
import singer
import logging
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from singer import metadata, metrics, Transformer
from singer.utils import strptime_with_tz
import backoff

from tap_xero.context import Context
from . import transform
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -63,12 +65,12 @@ def _make_request(ctx, tap_stream_id, filter_options=None, attempts=0):


class Stream():
def __init__(self, tap_stream_id, pk_fields, bookmark_key="UpdatedDateUTC", format_fn=None):
def __init__(self, tap_stream_id, pk_fields, bookmark_key="UpdatedDateUTC", format_fn=None, replication_method="INCREMENTAL"):
self.tap_stream_id = tap_stream_id
self.pk_fields = pk_fields
self.format_fn = format_fn or (lambda x: x)
self.bookmark_key = bookmark_key
self.replication_method = "INCREMENTAL"
self.replication_method = replication_method
self.filter_options = {}

def metrics(self, records):
Expand Down Expand Up @@ -100,17 +102,20 @@ def sync(self, ctx):


class ReportStream(Stream):
def sync(self, ctx):
def sync(self, ctx:Context):
bookmark = [self.tap_stream_id, self.bookmark_key]
start = ctx.update_start_date_bookmark(bookmark)

start_date = (datetime.now(timezone.utc) - relativedelta(months=12)).strftime("%Y-%m-%dT%H:%M:%SZ") if self.replication_method == "FULL_TABLE" else None
start = ctx.update_start_date_bookmark(bookmark, start_date=start_date)
start_dt = parse(start)
start_dt = datetime(start_dt.year, start_dt.month, 1)
previous_budgets = set()
break_loop = False
while True:
from_date = start_dt.strftime("%Y-%m-01")
to_date = start_dt + relativedelta(months=1) - timedelta(1)
if self.replication_method == "FULL_TABLE":
to_date = start_dt + relativedelta(months=12) - timedelta(1)
else:
to_date = start_dt + relativedelta(months=1) - timedelta(1)
to_date = to_date.strftime("%Y-%m-%d")
if self.tap_stream_id == "budgets":
self.filter_options.update(dict(DateFrom=from_date, DateTo=to_date))
Expand Down Expand Up @@ -160,7 +165,7 @@ class PaginatedStream(Stream):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def sync(self, ctx):
def sync(self, ctx:Context):
bookmark = [self.tap_stream_id, self.bookmark_key]
offset = [self.tap_stream_id, "page"]
start = ctx.update_start_date_bookmark(bookmark)
Expand Down Expand Up @@ -190,7 +195,7 @@ class Contacts(PaginatedStream):
def __init__(self, *args, **kwargs):
super().__init__("contacts", ["ContactID"], format_fn=transform.format_contacts, *args, **kwargs)

def sync(self, ctx):
def sync(self, ctx:Context):
# Parameter to collect archived contacts from the Xero platform
if ctx.config.get("include_archived_contacts") in ["true", True]:
self.filter_options.update({'includeArchived': "true"})
Expand All @@ -202,7 +207,7 @@ class Journals(Stream):
"""The Journals endpoint is a special case. It has its own way of ordering
and paging the data. See
https://developer.xero.com/documentation/api/journals"""
def sync(self, ctx):
def sync(self, ctx:Context):
bookmark = [self.tap_stream_id, self.bookmark_key]
journal_number = ctx.get_bookmark(bookmark) or 0
while True:
Expand All @@ -225,7 +230,7 @@ def sync(self, ctx):
if not records or len(records) < FULL_PAGE_SIZE:
break

def write_records(self, records, ctx):
def write_records(self, records, ctx:Context):
""""Custom implementation from the write records method available in Stream class"""
stream = ctx.catalog.get_stream(self.tap_stream_id)
schema = stream.schema.to_dict()
Expand Down Expand Up @@ -267,7 +272,7 @@ class LinkedTransactions(Stream):
the UpdatedDateUTC timestamp in them. Therefore we must always iterate over
all of the data, but we can manually omit records based on the
UpdatedDateUTC property."""
def sync(self, ctx):
def sync(self, ctx:Context):
bookmark = [self.tap_stream_id, self.bookmark_key]
offset = [self.tap_stream_id, "page"]
start = ctx.update_start_date_bookmark(bookmark)
Expand Down Expand Up @@ -297,7 +302,7 @@ def __init__(self, *args, **kwargs):
self.bookmark_key = None
self.replication_method = "FULL_TABLE"

def sync(self, ctx):
def sync(self, ctx:Context):
records = _make_request(ctx, self.tap_stream_id)
self.format_fn(records)
self.write_records(records, ctx)
Expand Down Expand Up @@ -354,6 +359,6 @@ def sync(self, ctx):
ReportStream("reports_profit_and_loss", ["from_date"], bookmark_key="to_date"),
ReportStream("reports_balance_sheet", ["from_date"], bookmark_key="to_date"),
ReportStream("budgets", ["from_date"], bookmark_key="to_date"),
ReportStream("reports_bank_summary", ["from_date"], bookmark_key="to_date"),
ReportStream("reports_bank_summary", ["from_date"], bookmark_key="to_date", replication_method="FULL_TABLE"),
]
all_stream_ids = [s.tap_stream_id for s in all_streams]

0 comments on commit b9ccc55

Please sign in to comment.