diff --git a/tap_stripe/streams.py b/tap_stripe/streams.py index bbd15aa..08523ce 100644 --- a/tap_stripe/streams.py +++ b/tap_stripe/streams.py @@ -194,15 +194,31 @@ def get_next_page_token( ) -> Optional[Any]: """Return a token for identifying next page or None if no more pages.""" return None + + def get_child_context(self, record, context) -> dict: + if record.get("invoice_item"): + return {"invoice_item_id": record["invoice_item"]} + def _sync_children(self, child_context: dict) -> None: + if child_context is not None: + return super()._sync_children(child_context) class InvoiceItems(stripeStream): """Define InvoiceItems stream.""" name = "invoice_items" - path = "invoiceitems" replication_key = "date" object = "plan" + parent_stream_type = InvoiceLineItems + fetch_from_parent_stream = False + ids = set() + + @property + def path(self): + path = "invoiceitems" + if self.fetch_from_parent_stream: + path = "invoiceitems/{invoice_item_id}" + return path schema = th.PropertiesList( th.Property("id", th.StringType), @@ -232,7 +248,36 @@ class InvoiceItems(stripeStream): th.Property("period", th.CustomType({"type": ["object", "string"]})), th.Property("price", th.CustomType({"type": ["object", "string"]})), ).to_dict() - + + def request_records(self, context: Optional[dict]) -> Iterable[dict]: + # 1. fetch all invoices using rep key + invoice_item_id = None + if not self.fetch_from_parent_stream: + invoice_item_id = context.pop("invoice_item_id") + yield from super().request_records(context) + self.fetch_from_parent_stream = True + # 2. fetch invoices from parent stream + if self.fetch_from_parent_stream: + if invoice_item_id: + context.update({"invoice_item_id": invoice_item_id}) + # get invoiceitem ids + yield from super().request_records(context) + + def get_url_params( + self, context: Optional[dict], next_page_token: Optional[Any] + ) -> Dict[str, Any]: + """Return a dictionary of values to be used in URL parameterization.""" + params = super().get_url_params(context, next_page_token) + if self.fetch_from_parent_stream: + # this params are not allowed for fetching invoiceitems by id + params.pop("created[gte]", None) + params.pop("limit", None) + return params + + def post_process(self, row, context) -> dict: + if row["id"] not in self.ids: + self.ids.add(row["id"]) + return super().post_process(row, context) class Subscriptions(stripeStream): """Define Subscriptions stream."""