diff --git a/tap_klaviyo/client.py b/tap_klaviyo/client.py index 378e3bd..cf70d77 100644 --- a/tap_klaviyo/client.py +++ b/tap_klaviyo/client.py @@ -18,6 +18,9 @@ from urllib3.exceptions import ProtocolError, InvalidChunkLength from requests.exceptions import ReadTimeout, ChunkedEncodingError import backoff +import os +import json +import logging class KlaviyoStream(RESTStream): @@ -47,7 +50,7 @@ def authenticator(self): def http_headers(self) -> dict: """Return the http headers needed.""" headers = {} - headers["revision"] = "2024-02-15" + headers["revision"] = "2024-10-15" if "user_agent" in self.config: headers["User-Agent"] = self.config.get("user_agent") return headers @@ -139,6 +142,34 @@ def get_jsonschema_type(self, obj): else: return th.CustomType({"type": ["string", "number", "object"]}) + def get_abs_path(self, path): + return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) + + def load_schema(self, name): + return json.load(open(self.get_abs_path('schemas/{}.json'.format(name)))) + + def _fill_missing_properties(self, property_list): + try: + default_schema = self.load_schema(self.name) + except: + logging.warning(f"The stream '{self.name}' does not have a default schema.") + return property_list + + new_properties = default_schema.get('properties', {}) + + def recursive_copy(source, target): + for key, value in source.items(): + if isinstance(value, dict): + target[key] = target.get(key, {}) + recursive_copy(value, target[key]) + else: + target[key] = value + + recursive_copy(property_list.get('properties', {}), new_properties) + + property_list['properties'] = new_properties + return property_list + def get_schema(self) -> dict: """Dynamically detect the json schema for the stream. This is evaluated prior to any records being retrieved. @@ -212,13 +243,13 @@ def get_schema(self) -> dict: ) # Return the list as a JSON Schema dictionary object property_list = th.PropertiesList(*properties).to_dict() - - return property_list else: - return th.PropertiesList( + property_list = th.PropertiesList( th.Property("id", th.StringType), th.Property(self.replication_key, th.DateTimeType), ).to_dict() + property_list = self._fill_missing_properties(property_list) + return property_list @cached_property def schema(self) -> dict: diff --git a/tap_klaviyo/schemas/reviews.json b/tap_klaviyo/schemas/reviews.json new file mode 100644 index 0000000..0542a97 --- /dev/null +++ b/tap_klaviyo/schemas/reviews.json @@ -0,0 +1,256 @@ +{ + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "review" + ] + }, + "id": { + "type": "string", + "description": "The ID of the review" + }, + "attributes": { + "type": "object", + "properties": { + "email": { + "type": "string", + "description": "The email of the author of this review" + }, + "status": { + "type": "string", + "enum": [ + "featured", + "pending", + "published", + "rejected", + "unpublished" + ], + "description": "The status of this review" + }, + "verified": { + "type": "boolean", + "description": "The verification status of this review (aka whether or not we have confirmation that the customer bought the product)" + }, + "review_type": { + "type": "string", + "enum": [ + "review", + "question", + "rating" + ], + "description": "The type of this review" + }, + "created": { + "type": "string", + "format": "date-time", + "description": "The datetime when this review was created" + }, + "updated": { + "type": "string", + "format": "date-time", + "description": "The datetime when this review was updated" + }, + "images": { + "type": "array", + "items": { + "type": [ + "string", + "null" + ] + }, + "description": "The list of images submitted with this review (represented as a list of urls). If there are no images, this field will be an empty list." + }, + "product": { + "type": [ + "object", + "null" + ], + "description": "The product associated with this review", + "properties": { + "url": { + "type": "string", + "description": "The URL of the product" + }, + "name": { + "type": "string", + "description": "The name of the product" + }, + "image_url": { + "type": "string", + "description": "The URL of the product image" + } + } + }, + "rating": { + "type": [ + "integer", + "null" + ], + "description": "The rating of this review on a scale from 1-5" + }, + "author": { + "type": [ + "string", + "null" + ], + "description": "The author of this review" + }, + "content": { + "type": [ + "string", + "null" + ], + "description": "The content of this review" + }, + "title": { + "type": [ + "string", + "null" + ], + "description": "The title of this review" + }, + "smart_quote": { + "type": [ + "string", + "null" + ], + "description": "A quote from this review that summarizes the content" + }, + "public_reply": { + "type": [ + "object", + "null" + ], + "properties": { + "content": { + "type": "string", + "description": "The content of the public reply" + }, + "author": { + "type": "string", + "description": "The author of the public reply" + }, + "updated": { + "type": "string", + "format": "date-time", + "description": "The datetime when this public reply was updated" + } + } + } + }, + "required": [ + "email", + "verified", + "review_type", + "created", + "updated", + "images" + ] + }, + "links": { + "type": "object", + "properties": { + "self": { + "type": "string", + "format": "uri", + "description": "Self link" + } + }, + "required": [ + "self" + ] + }, + "relationships": { + "type": [ + "object", + "null" + ], + "properties": { + "events": { + "type": [ + "object", + "null" + ], + "properties": { + "data": { + "type": [ + "array", + "null" + ], + "items": { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "event" + ] + }, + "id": { + "type": "string", + "description": "Related Events" + } + }, + "required": [ + "type", + "id" + ] + } + }, + "links": { + "type": "object", + "properties": { + "self": { + "type": "string", + "format": "uri", + "description": "Self link" + }, + "related": { + "type": "string", + "format": "uri", + "description": "Related link" + } + }, + "required": [ + "self", + "related" + ] + } + } + }, + "item": { + "type": "object", + "properties": { + "links": { + "type": "object", + "properties": { + "self": { + "type": "string", + "format": "uri", + "description": "Self link" + }, + "related": { + "type": "string", + "format": "uri", + "description": "Related link" + } + }, + "required": [ + "self", + "related" + ] + } + } + } + } + } + }, + "required": [ + "type", + "id", + "attributes", + "links" + ] +} \ No newline at end of file diff --git a/tap_klaviyo/streams.py b/tap_klaviyo/streams.py index eb49bc6..f4c01b5 100644 --- a/tap_klaviyo/streams.py +++ b/tap_klaviyo/streams.py @@ -1,5 +1,6 @@ """Stream type classes for tap-klaviyo.""" +from typing import Any, Dict, Optional from tap_klaviyo.client import KlaviyoStream @@ -50,3 +51,24 @@ class ListMembersStream(KlaviyoStream): primary_keys = ["id"] replication_key = "joined_group_at" parent_stream_type = ListsStream + +class ReviewsStream(KlaviyoStream): + """Define custom stream.""" + + name = "reviews" + path = "/reviews" + primary_keys = ["id"] + replication_key = "created" + + def get_url_params( + self, context: Optional[dict], next_page_token: Optional[Any] + ) -> Dict[str, Any]: + """Return a dictionary of values to be used in URL parameterization.""" + params: dict = {} + if next_page_token: + params["page[cursor]"] = next_page_token + start_date = self.get_starting_time(context) + if self.replication_key and start_date: + start_date = start_date.strftime("%Y-%m-%dT%H:%M:%SZ") + params["filter"] = f"greater-or-equal({self.replication_key},{start_date})" + return params \ No newline at end of file diff --git a/tap_klaviyo/tap.py b/tap_klaviyo/tap.py index a6979e1..540766d 100644 --- a/tap_klaviyo/tap.py +++ b/tap_klaviyo/tap.py @@ -11,6 +11,7 @@ ListMembersStream, ListsStream, MetricsStream, + ReviewsStream, ) STREAM_TYPES = [ @@ -19,6 +20,7 @@ MetricsStream, EventsStream, ListMembersStream, + ReviewsStream, ]