Skip to content

Commit

Permalink
fix streams to be more DRY
Browse files Browse the repository at this point in the history
  • Loading branch information
Jong authored and Jong committed Jan 17, 2024
1 parent 09a638e commit 6b5d258
Showing 1 changed file with 60 additions and 181 deletions.
241 changes: 60 additions & 181 deletions tap_restaurant365/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,60 @@
from tap_restaurant365.client import Restaurant365Stream


class ThirtyDaysStream(Restaurant365Stream):
"""parent class stream for override/pagination"""

name = "vendors"
path = "/Company"

def get_next_page_token(
self, response: requests.Response, previous_token: t.Optional[t.Any]
) -> t.Optional[t.Any]:
"""Return a token for identifying next page or None if no more pages."""
if self.paginate == True:
# start_date = self.config.get("start_date")
start_date = (parser.parse(self.tap_state["bookmarks"][self.name]['starting_replication_value']) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date"))
today = datetime.today()
previous_token = previous_token or start_date
next_token = (previous_token + timedelta(days=30)).replace(tzinfo=None)

if (today - next_token).days < 30:
self.paginate = False
return next_token
else:
return None

def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: Any | None, # noqa: ANN401
) -> dict[str, Any]:

params: dict = {}
#x.strftime('%Y-%m-%dT%H:%M:%SZ')

start_date = next_page_token or self.get_starting_time(context)
end_date = start_date + timedelta(days = 30)
if self.replication_key:
params["$filter"] = f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}"
if self.name == "journal_entries":
#set a date in the stream to check later to see if we need to keep calling to the stream
params["$filter"] += f" and type eq 'Journal Entry'"
if self.name == "bills":
params["$filter"] += f" and type eq 'AP Invoice'"
if self.name == "credit_memos":
params["$filter"] += f" and type eq 'AP Credit Memo'"
if self.name == "stock_count":
params["$filter"] += f" and type eq 'Stock Count'"
if self.name == "bank_expenses":
params["$filter"] += f" and type eq 'Bank Expense'"
#

return params




class AccountsStream(Restaurant365Stream):
"""Define custom stream."""

Expand Down Expand Up @@ -48,7 +102,7 @@ class AccountsStream(Restaurant365Stream):
).to_dict()


class TransactionsStream(Restaurant365Stream):
class TransactionsStream(ThirtyDaysStream):
"""Define custom stream."""

name = "transaction"
Expand All @@ -73,52 +127,6 @@ class TransactionsStream(Restaurant365Stream):
th.Property("modifiedBy", th.StringType)
).to_dict()

def get_next_page_token(
self, response: requests.Response, previous_token: t.Optional[t.Any]
) -> t.Optional[t.Any]:
"""Return a token for identifying next page or None if no more pages."""
if self.paginate == True:
# start_date = self.config.get("start_date")
start_date = (parser.parse(self.tap_state["bookmarks"][self.name]['starting_replication_value']) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date"))
today = datetime.today()
previous_token = previous_token or start_date
next_token = (previous_token + timedelta(days=30)).replace(tzinfo=None)

if (today - next_token).days < 30:
self.paginate = False
return next_token
else:
return None

def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: Any | None, # noqa: ANN401
) -> dict[str, Any]:

params: dict = {}
#x.strftime('%Y-%m-%dT%H:%M:%SZ')


start_date = next_page_token or self.get_starting_time(context)
end_date = start_date + timedelta(days = 30)
if self.replication_key:
params["$filter"] = f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}"
if self.name == "journal_entries":
#set a date in the stream to check later to see if we need to keep calling to the stream
params["$filter"] += f" and type eq 'Journal Entry'"
if self.name == "bills":
params["$filter"] += f" and type eq 'AP Invoice'"
if self.name == "credit_memos":
params["$filter"] += f" and type eq 'AP Credit Memo'"
if self.name == "stock_count":
params["$filter"] += f" and type eq 'Stock Count'"
if self.name == "bank_expenses":
params["$filter"] += f" and type eq 'Bank Expense'"
#
return params



class BillsStream(TransactionsStream):
"""Define custom stream."""
Expand Down Expand Up @@ -166,6 +174,7 @@ class BankExpensesStream(TransactionsStream):
replication_key = "modifiedOn"
paginate = True


class VendorsStream(Restaurant365Stream):
"""Define custom stream."""

Expand Down Expand Up @@ -346,7 +355,7 @@ class POSEmployeeStream(Restaurant365Stream):

).to_dict()

class SalesEmployeeStream(Restaurant365Stream):
class SalesEmployeeStream(ThirtyDaysStream):
"""Define custom stream."""

name = "sales_employee"
Expand Down Expand Up @@ -383,41 +392,9 @@ class SalesEmployeeStream(Restaurant365Stream):

).to_dict()

def get_next_page_token(
self, response: requests.Response, previous_token: t.Optional[t.Any]
) -> t.Optional[t.Any]:
"""Return a token for identifying next page or None if no more pages."""
if self.paginate == True:
# start_date = self.config.get("start_date")
start_date = (parser.parse(self.tap_state["bookmarks"][self.name]['starting_replication_value']) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date"))
today = datetime.today()
previous_token = previous_token or start_date
next_token = (previous_token + timedelta(days=30)).replace(tzinfo=None)

if (today - next_token).days < 30:
self.paginate = False
return next_token
else:
return None

def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: Any | None, # noqa: ANN401
) -> dict[str, Any]:

params: dict = {}
#x.strftime('%Y-%m-%dT%H:%M:%SZ')

start_date = next_page_token or self.get_starting_time(context)
end_date = start_date + timedelta(days = 30)
if self.replication_key:
params["$filter"] = f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}"

return params


class SalesDetailStream(Restaurant365Stream):
class SalesDetailStream(ThirtyDaysStream):
"""Define custom stream."""

name = "sales_detail"
Expand Down Expand Up @@ -449,41 +426,8 @@ class SalesDetailStream(Restaurant365Stream):

).to_dict()

def get_next_page_token(
self, response: requests.Response, previous_token: t.Optional[t.Any]
) -> t.Optional[t.Any]:
"""Return a token for identifying next page or None if no more pages."""
if self.paginate == True:
# start_date = self.config.get("start_date")
start_date = (parser.parse(self.tap_state["bookmarks"][self.name]['starting_replication_value']) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date"))
today = datetime.today()
previous_token = previous_token or start_date
next_token = (previous_token + timedelta(days=30)).replace(tzinfo=None)

if (today - next_token).days < 30:
self.paginate = False
return next_token
else:
return None

def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: Any | None, # noqa: ANN401
) -> dict[str, Any]:

params: dict = {}
#x.strftime('%Y-%m-%dT%H:%M:%SZ')

start_date = next_page_token or self.get_starting_time(context)
end_date = start_date + timedelta(days = 30)
if self.replication_key:
params["$filter"] = f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}"

return params


class SalesPaymentStream(Restaurant365Stream):
class SalesPaymentStream(ThirtyDaysStream):
"""Define custom stream."""

name = "sales_payment"
Expand Down Expand Up @@ -517,38 +461,6 @@ class SalesPaymentStream(Restaurant365Stream):

).to_dict()

def get_next_page_token(
self, response: requests.Response, previous_token: t.Optional[t.Any]
) -> t.Optional[t.Any]:
"""Return a token for identifying next page or None if no more pages."""
if self.paginate == True:
# start_date = self.config.get("start_date")
start_date = (parser.parse(self.tap_state["bookmarks"][self.name]['starting_replication_value']) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date"))
today = datetime.today()
previous_token = previous_token or start_date
next_token = (previous_token + timedelta(days=30)).replace(tzinfo=None)

if (today - next_token).days < 30:
self.paginate = False
return next_token
else:
return None

def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: Any | None, # noqa: ANN401
) -> dict[str, Any]:

params: dict = {}
#x.strftime('%Y-%m-%dT%H:%M:%SZ')

start_date = next_page_token or self.get_starting_time(context)
end_date = start_date + timedelta(days = 30)
if self.replication_key:
params["$filter"] = f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}"

return params


class EntityDeletedStream(Restaurant365Stream):
Expand All @@ -566,7 +478,7 @@ class EntityDeletedStream(Restaurant365Stream):

).to_dict()

class TransactionDetailsStream(Restaurant365Stream):
class TransactionDetailsStream(ThirtyDaysStream):
"""Define custom stream."""

name = "transaction_detail"
Expand Down Expand Up @@ -595,36 +507,3 @@ class TransactionDetailsStream(Restaurant365Stream):
th.Property("modifiedOn", th.DateTimeType),

).to_dict()

def get_next_page_token(
self, response: requests.Response, previous_token: t.Optional[t.Any]
) -> t.Optional[t.Any]:
"""Return a token for identifying next page or None if no more pages."""
if self.paginate == True:
# start_date = self.config.get("start_date")
start_date = (parser.parse(self.tap_state["bookmarks"][self.name]['starting_replication_value']) + timedelta(seconds=1)) or parser.parse(self.config.get("start_date"))
today = datetime.today()
previous_token = previous_token or start_date
next_token = (previous_token + timedelta(days=30)).replace(tzinfo=None)

if (today - next_token).days < 30:
self.paginate = False
return next_token
else:
return None

def get_url_params(
self,
context: dict | None, # noqa: ARG002
next_page_token: Any | None, # noqa: ANN401
) -> dict[str, Any]:

params: dict = {}
#x.strftime('%Y-%m-%dT%H:%M:%SZ')

start_date = next_page_token or self.get_starting_time(context)
end_date = start_date + timedelta(days = 30)
if self.replication_key:
params["$filter"] = f"{self.replication_key} ge {start_date.strftime('%Y-%m-%dT%H:%M:%SZ')} and {self.replication_key} lt {end_date.strftime('%Y-%m-%dT%H:%M:%SZ')}"

return params

0 comments on commit 6b5d258

Please sign in to comment.