Skip to content

Commit

Permalink
fetch invoiceitems from invoice line items in addition to normal sync (
Browse files Browse the repository at this point in the history
  • Loading branch information
keyn4 authored Dec 13, 2024
1 parent 6681420 commit a5f001b
Showing 1 changed file with 47 additions and 2 deletions.
49 changes: 47 additions & 2 deletions tap_stripe/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,31 @@ def get_next_page_token(
) -> Optional[Any]:
"""Return a token for identifying next page or None if no more pages."""
return None

def get_child_context(self, record, context) -> dict:
if record.get("invoice_item"):
return {"invoice_item_id": record["invoice_item"]}

def _sync_children(self, child_context: dict) -> None:
if child_context is not None:
return super()._sync_children(child_context)

class InvoiceItems(stripeStream):
"""Define InvoiceItems stream."""

name = "invoice_items"
path = "invoiceitems"
replication_key = "date"
object = "plan"
parent_stream_type = InvoiceLineItems
fetch_from_parent_stream = False
ids = set()

@property
def path(self):
path = "invoiceitems"
if self.fetch_from_parent_stream:
path = "invoiceitems/{invoice_item_id}"
return path

schema = th.PropertiesList(
th.Property("id", th.StringType),
Expand Down Expand Up @@ -232,7 +248,36 @@ class InvoiceItems(stripeStream):
th.Property("period", th.CustomType({"type": ["object", "string"]})),
th.Property("price", th.CustomType({"type": ["object", "string"]})),
).to_dict()


def request_records(self, context: Optional[dict]) -> Iterable[dict]:
# 1. fetch all invoices using rep key
invoice_item_id = None
if not self.fetch_from_parent_stream:
invoice_item_id = context.pop("invoice_item_id")
yield from super().request_records(context)
self.fetch_from_parent_stream = True
# 2. fetch invoices from parent stream
if self.fetch_from_parent_stream:
if invoice_item_id:
context.update({"invoice_item_id": invoice_item_id})
# get invoiceitem ids
yield from super().request_records(context)

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization."""
params = super().get_url_params(context, next_page_token)
if self.fetch_from_parent_stream:
# this params are not allowed for fetching invoiceitems by id
params.pop("created[gte]", None)
params.pop("limit", None)
return params

def post_process(self, row, context) -> dict:
if row["id"] not in self.ids:
self.ids.add(row["id"])
return super().post_process(row, context)

class Subscriptions(stripeStream):
"""Define Subscriptions stream."""
Expand Down

0 comments on commit a5f001b

Please sign in to comment.