diff --git a/.github/workflows/ci_workflow.yml b/.github/workflows/ci_workflow.yml new file mode 100644 index 0000000..c4dec1b --- /dev/null +++ b/.github/workflows/ci_workflow.yml @@ -0,0 +1,58 @@ +### A CI workflow template that runs linting and python testing +### TODO: Modify as needed or as desired. + +name: Test tap-klaviyo-v2 + +on: [push] + +jobs: + linting: + + runs-on: ubuntu-latest + strategy: + matrix: + # Only lint using the primary version used for dev + python-version: [3.9] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + version: 1.1.8 + - name: Install dependencies + run: | + poetry install + - name: Run lint command from tox.ini + run: | + poetry run tox -e lint + + pytest: + + runs-on: ubuntu-latest + env: + GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}} + strategy: + matrix: + python-version: [3.7, 3.8, 3.9] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + version: 1.1.11 + - name: Install dependencies + run: | + poetry install + - name: Test with pytest + run: | + poetry run pytest --capture=no diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..475019c --- /dev/null +++ b/.gitignore @@ -0,0 +1,136 @@ +# Secrets and internal config files +**/.secrets/* + +# Ignore meltano internal cache and sqlite systemdb + +.meltano/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ diff --git a/.secrets/.gitignore b/.secrets/.gitignore new file mode 100644 index 0000000..33c6acd --- /dev/null +++ b/.secrets/.gitignore @@ -0,0 +1,10 @@ +# IMPORTANT! This folder is hidden from git - if you need to store config files or other secrets, +# make sure those are never staged for commit into your git repo. You can store them here or another +# secure location. +# +# Note: This may be redundant with the global .gitignore for, and is provided +# for redundancy. If the `.secrets` folder is not needed, you may delete it +# from the project. + +* +!.gitignore diff --git a/README.md b/README.md new file mode 100644 index 0000000..b582376 --- /dev/null +++ b/README.md @@ -0,0 +1,65 @@ +# tap-klaviyo + +This is a [Singer](https://singer.io) tap that produces JSON-formatted +data from the Klaviyo API following the [Singer +spec](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md). + +This tap: + +- Pulls raw data from the [Klaviyo metrics API](https://www.klaviyo.com/docs/api/metrics) +- Outputs the schema for each resource +- Incrementally pulls data based on the input state for incremental endpoints +- Updates full tables for global exclusions and lists endpoints + +## Quick start + +1. Install + + ```bash + > virtualenv -p python3 venv + > source venv/bin/activate + > pip install tap-klaviyo + ``` + +2. Create the config file + + Create a JSON file containing your API key or client ID and client Secret and start date. + + ```json + { + "api_key": "pk_XYZ", + "start_date": "2017-01-01T00:00:00Z", + "client_id": "client_id", + "client_secret": "client_secret", + } + ``` + +3. [Optional] Create the initial state file + + You can provide JSON file that contains a date for the metrics endpoints to force the application to only fetch events since those dates. If you omit the file it will fetch all + commits and issues. + + ```json + { + "bookmarks": { + "receive": { "since": "2017-04-01T00:00:00Z" }, + "open": { "since": "2017-04-01T00:00:00Z" } + } + } + ``` + +4. [Optional] Run discover command and save catalog into catalog file + + ```bash + tap-klaviyo --config config.json --discover + ``` + +5. Run the application + + `tap-klaviyo` can be run with: + + ```bash + tap-klaviyo --config config.json [--state state.json] [--catalog catalog.json] + ``` + +--- \ No newline at end of file diff --git a/meltano.yml b/meltano.yml new file mode 100644 index 0000000..5d841bd --- /dev/null +++ b/meltano.yml @@ -0,0 +1,25 @@ +version: 1 +send_anonymous_usage_stats: true +project_id: tap-klaviyo-v2 +plugins: + extractors: + - name: tap-klaviyo-v2 + namespace: tap_klaviyo + pip_url: -e . + capabilities: + - state + - catalog + - discover + config: + start_date: '2010-01-01T00:00:00Z' + settings: + # TODO: To configure using Meltano, declare settings and their types here: + - name: username + - name: password + kind: password + - name: start_date + value: '2010-01-01T00:00:00Z' + loaders: + - name: target-jsonl + variant: andyh1203 + pip_url: target-jsonl diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..ba621de --- /dev/null +++ b/mypy.ini @@ -0,0 +1,6 @@ +[mypy] +python_version = 3.9 +warn_unused_configs = True + +[mypy-backoff.*] +ignore_missing_imports = True diff --git a/output/.gitignore b/output/.gitignore new file mode 100644 index 0000000..80ff9d2 --- /dev/null +++ b/output/.gitignore @@ -0,0 +1,4 @@ +# This directory is used as a target by target-jsonl, so ignore all files + +* +!.gitignore diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..ab143d3 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,39 @@ +[tool.poetry] +name = "tap-klaviyo" +version = "0.0.1" +description = "`tap-klaviyo` is a Singer tap for Klaviyo, built with the Meltano SDK for Singer Taps." +authors = ["Hotglue"] +keywords = [ + "ELT", + "Klaviyo", +] +license = "Apache 2.0" + +[tool.poetry.dependencies] +python = "<3.11,>=3.7.1" +requests = "^2.25.1" +singer-sdk = "^0.5.0" +"backports.cached-property" = "^1.0.1" + +[tool.poetry.dev-dependencies] +pytest = "^6.2.5" +tox = "^3.24.4" +flake8 = "^3.9.2" +black = "^21.9b0" +pydocstyle = "^6.1.1" +mypy = "^0.910" +types-requests = "^2.26.1" +isort = "^5.10.1" + +[tool.isort] +profile = "black" +multi_line_output = 3 # Vertical Hanging Indent +src_paths = "tap_klaviyo" + +[build-system] +requires = ["poetry-core>=1.0.8"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry.scripts] +# CLI declaration +tap-klaviyo-v2 = 'tap_klaviyo.tap:TapKlaviyo.cli' diff --git a/tap_klaviyo/__init__.py b/tap_klaviyo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tap_klaviyo/auth.py b/tap_klaviyo/auth.py new file mode 100644 index 0000000..b836c75 --- /dev/null +++ b/tap_klaviyo/auth.py @@ -0,0 +1,99 @@ +"""Klaviyo Authentication.""" + + +import json +from datetime import datetime +from typing import Optional + +import backoff +import requests +from singer_sdk.authenticators import OAuthAuthenticator, SingletonMeta +from singer_sdk.streams import Stream as RESTStreamBase + + +class KlaviyoAuthenticator(OAuthAuthenticator, metaclass=SingletonMeta): + """Authenticator class for Klaviyo.""" + + def __init__( + self, + stream: RESTStreamBase, + config_file: Optional[str] = None, + auth_endpoint: Optional[str] = None, + oauth_scopes: Optional[str] = None, + ) -> None: + super().__init__( + stream=stream, auth_endpoint=auth_endpoint, oauth_scopes=oauth_scopes + ) + self._config_file = config_file + self._tap = stream._tap + + @property + def auth_headers(self) -> dict: + if not self.is_token_valid(): + self.update_access_token() + result = {} + result["Authorization"] = f"Bearer {self._tap._config.get('access_token')}" + return result + + def is_token_valid(self) -> bool: + access_token = self._tap._config.get("access_token") + now = round(datetime.utcnow().timestamp()) + expires_in = self._tap._config.get("expires_in") + if expires_in is not None: + expires_in = int(expires_in) + if not access_token: + return False + if not expires_in: + return False + return not ((expires_in - now) < 120) + + @property + def oauth_request_body(self) -> dict: + """Define the OAuth request body for the Klaviyo API.""" + return { + "grant_type": "refresh_token", + "refresh_token": self._tap._config.get("refresh_token"), + } + + @classmethod + def create_for_stream(cls, stream) -> "KlaviyoAuthenticator": + return cls( + stream=stream, + auth_endpoint="https://a.klaviyo.com/oauth/token", + oauth_scopes="", + ) + + @backoff.on_exception(backoff.expo, Exception, max_tries=3) + def update_access_token(self) -> None: + self.logger.info( + f"Oauth request - endpoint: {self._auth_endpoint}, body: {self.oauth_request_body}" + ) + headers = {"Content-Type": "application/x-www-form-urlencoded"} + token_response = requests.post( + self._auth_endpoint, + data=self.oauth_request_body, + headers=headers, + auth=( + self._tap._config.get("client_id"), + self._tap._config.get("client_secret"), + ), + ) + try: + token_response.raise_for_status() + self.logger.info("OAuth authorization attempt was successful.") + except Exception as ex: + self.state.update({"auth_error_response": token_response.text}) + raise RuntimeError( + f"Failed OAuth login, response was '{token_response.text()}'. {ex}" + ) + token_json = token_response.json() + # Log the refresh_token + self.logger.info(f"Latest refresh token: {token_json['refresh_token']}") + self.access_token = token_json["access_token"] + self._tap._config["access_token"] = token_json["access_token"] + self._tap._config["refresh_token"] = token_json["refresh_token"] + now = round(datetime.utcnow().timestamp()) + self._tap._config["expires_in"] = now + token_json["expires_in"] + + with open(self._tap.config_file, "w") as outfile: + json.dump(self._tap._config, outfile, indent=4) diff --git a/tap_klaviyo/client.py b/tap_klaviyo/client.py new file mode 100644 index 0000000..9a40f34 --- /dev/null +++ b/tap_klaviyo/client.py @@ -0,0 +1,199 @@ +"""REST client handling, including KlaviyoStream base class.""" + +import re +from datetime import datetime +from typing import Any, Dict, Optional + +import requests +from backports.cached_property import cached_property +from pendulum import parse +from singer_sdk import typing as th +from singer_sdk.authenticators import APIKeyAuthenticator +from singer_sdk.helpers.jsonpath import extract_jsonpath +from singer_sdk.streams import RESTStream + +from tap_klaviyo.auth import KlaviyoAuthenticator + + +class KlaviyoStream(RESTStream): + """Klaviyo stream class.""" + + url_base = "https://a.klaviyo.com/api" + + records_jsonpath = "$.data[*]" + next_page_token_jsonpath = "$.links.next" + + @property + def authenticator(self): + # auth with hapikey + if self.config.get("api_private_key"): + api_key = f'Klaviyo-API-Key {self.config.get("api_private_key")}' + return APIKeyAuthenticator.create_for_stream( + self, key="Authorization", value=api_key, location="header" + ) + # auth with acces token + return KlaviyoAuthenticator.create_for_stream(self) + + @property + def http_headers(self) -> dict: + """Return the http headers needed.""" + headers = {} + headers["revision"] = "2024-02-15" + if "user_agent" in self.config: + headers["User-Agent"] = self.config.get("user_agent") + return headers + + def get_next_page_token( + self, response: requests.Response, previous_token: Optional[Any] + ) -> Optional[Any]: + """Return a token for identifying next page or None if no more pages.""" + if self.next_page_token_jsonpath: + all_matches = extract_jsonpath( + self.next_page_token_jsonpath, response.json() + ) + token_link = next(iter(all_matches), None) + if token_link: + match = re.search(r"page%5Bcursor%5D=(.*)", token_link) + if match: + return match.group(1) + + def get_starting_time(self, context): + start_date = self.config.get("start_date") + if start_date: + start_date = parse(self.config.get("start_date")) + rep_key = self.get_starting_timestamp(context) + return rep_key or start_date + + def get_url_params( + self, context: Optional[dict], next_page_token: Optional[Any] + ) -> Dict[str, Any]: + """Return a dictionary of values to be used in URL parameterization.""" + params: dict = {} + if next_page_token: + params["page[cursor]"] = next_page_token + start_date = self.get_starting_time(context) + if self.replication_key and start_date: + start_date = start_date.strftime("%Y-%m-%dT%H:%M:%SZ") + params["filter"] = f"greater-than({self.replication_key},{start_date})" + return params + + def post_process(self, row, context): + row = super().post_process(row, context) + rep_key = self.replication_key + if row.get("attributes") and self.replication_key: + row[rep_key] = row["attributes"][rep_key] + return row + + def is_unix_timestamp(self, date): + try: + datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z") + return True + except: + return False + + def get_jsonschema_type(self, obj): + dtype = type(obj) + + if dtype == int: + return th.IntegerType() + if dtype == float: + return th.NumberType() + if dtype == str: + if self.is_unix_timestamp(obj): + return th.DateTimeType() + return th.StringType() + if dtype == bool: + return th.BooleanType() + if dtype == list: + if len(obj) > 0: + return th.ArrayType(self.get_jsonschema_type(obj[0])) + else: + return th.ArrayType( + th.CustomType({"type": ["number", "string", "object"]}) + ) + if dtype == dict: + obj_props = [] + for key in obj.keys(): + obj_props.append(th.Property(key, self.get_jsonschema_type(obj[key]))) + return th.ObjectType(*obj_props) + else: + return th.CustomType({"type": ["number", "string", "object"]}) + + def get_schema(self) -> dict: + """Dynamically detect the json schema for the stream. + This is evaluated prior to any records being retrieved. + """ + self._requests_session = requests.Session() + # Get the data + headers = self.http_headers + headers.update(self.authenticator.auth_headers) + path = self.path + + request_type = self.rest_method + url = self.url_base + path + + # discover for child streams + if self.parent_stream_type: + parent_url = self.url_base + self.parent_stream_type.path + id = requests.request( + request_type, + parent_url, + headers=headers, + ).json()["data"] + if id: + id = id[0]["id"] + url = url.replace("{id}", id) + + records = requests.request( + request_type, + url, + headers=headers, + ) + if records.status_code == 200: + records = records.json().get("data", []) + else: + raise Exception( + f"There was an error when fetching data for schemas {records.text}" + ) + + if len(records) > 0: + properties = [] + property_names = set() + + # Loop through all records – some objects have different keys + for record in records: + # Loop through each key in the object + for name in record.keys(): + if name in property_names: + continue + # Add the new property to our list + property_names.add(name) + if self.is_unix_timestamp(record[name]): + properties.append(th.Property(name, th.DateTimeType)) + else: + properties.append( + th.Property(name, self.get_jsonschema_type(record[name])) + ) + # if the rep_key is not at a header level add updated as default + if ( + self.replication_key is not None + and self.replication_key not in record.keys() + ): + properties.append( + th.Property(self.replication_key, th.DateTimeType) + ) + # we need to process only first record + break + # Return the list as a JSON Schema dictionary object + property_list = th.PropertiesList(*properties).to_dict() + + return property_list + else: + return th.PropertiesList( + th.Property("id", th.StringType), + th.Property(self.replication_key, th.DateTimeType), + ).to_dict() + + @cached_property + def schema(self) -> dict: + return self.get_schema() diff --git a/tap_klaviyo/streams.py b/tap_klaviyo/streams.py new file mode 100644 index 0000000..c694249 --- /dev/null +++ b/tap_klaviyo/streams.py @@ -0,0 +1,52 @@ +"""Stream type classes for tap-klaviyo.""" + +from tap_klaviyo.client import KlaviyoStream + + +class ContactsStream(KlaviyoStream): + """Define custom stream.""" + + name = "contacts" + path = "/profiles" + primary_keys = ["id"] + replication_key = "updated" + + +class ListsStream(KlaviyoStream): + """Define custom stream.""" + + name = "lists" + path = "/lists" + primary_keys = ["id"] + replication_key = "updated" + + def get_child_context(self, record, context): + return {"id": record["id"]} + + +class MetricsStream(KlaviyoStream): + """Define custom stream.""" + + name = "metrics" + path = "/metrics" + primary_keys = ["id"] + replication_key = None + + +class EventsStream(KlaviyoStream): + """Define custom stream.""" + + name = "events" + path = "/events" + primary_keys = ["id"] + replication_key = "updated" + + +class ListMembersStream(KlaviyoStream): + """Define custom stream.""" + + name = "list_members" + path = "/lists/{id}/profiles" + primary_keys = ["id"] + replication_key = "joined_group_at" + parent_stream_type = ListsStream diff --git a/tap_klaviyo/tap.py b/tap_klaviyo/tap.py new file mode 100644 index 0000000..ef79886 --- /dev/null +++ b/tap_klaviyo/tap.py @@ -0,0 +1,66 @@ +"""Klaviyo tap class.""" + +from typing import List + +from singer_sdk import Stream, Tap +from singer_sdk import typing as th + +from tap_klaviyo.streams import ( + ContactsStream, + EventsStream, + ListMembersStream, + ListsStream, + MetricsStream, +) + +STREAM_TYPES = [ + ContactsStream, + ListsStream, + MetricsStream, + EventsStream, + ListMembersStream, +] + + +class TapKlaviyo(Tap): + """Klaviyo tap class.""" + + name = "tap-klaviyo" + + def __init__( + self, + config=None, + catalog=None, + state=None, + parse_env_config=False, + validate_config=True, + ) -> None: + self.config_file = config[0] + super().__init__(config, catalog, state, parse_env_config, validate_config) + + config_jsonschema = th.PropertiesList( + th.Property( + "client_id", + th.StringType, + ), + th.Property( + "client_secret", + th.StringType, + ), + th.Property( + "refresh_token", + th.StringType, + ), + th.Property( + "api_private_key", + th.StringType, + ), + ).to_dict() + + def discover_streams(self) -> List[Stream]: + """Return a list of discovered streams.""" + return [stream_class(tap=self) for stream_class in STREAM_TYPES] + + +if __name__ == "__main__": + TapKlaviyo.cli() diff --git a/tap_klaviyo/tests/__init__.py b/tap_klaviyo/tests/__init__.py new file mode 100644 index 0000000..8503f0a --- /dev/null +++ b/tap_klaviyo/tests/__init__.py @@ -0,0 +1 @@ +"""Test suite for tap-klaviyo.""" diff --git a/tap_klaviyo/tests/test_core.py b/tap_klaviyo/tests/test_core.py new file mode 100644 index 0000000..c16a421 --- /dev/null +++ b/tap_klaviyo/tests/test_core.py @@ -0,0 +1,18 @@ +"""Tests standard tap features using the built-in SDK tests library.""" + +import datetime + +from singer_sdk.testing import get_standard_tap_tests +from tap_klaviyo_v2.tap import TapKlaviyo + +SAMPLE_CONFIG = { + "start_date": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%d") +} + + +# Run standard built-in tap tests from the SDK: +def test_standard_tap_tests(): + """Run standard tap tests from the SDK.""" + tests = get_standard_tap_tests(TapKlaviyo, config=SAMPLE_CONFIG) + for test in tests: + test() diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..66e52b2 --- /dev/null +++ b/tox.ini @@ -0,0 +1,53 @@ +# This file can be used to customize tox tests as well as other test frameworks like flake8 and mypy + +[tox] +envlist = py38 +; envlist = py37, py38, py39 +isolated_build = true + +[testenv] +whitelist_externals = poetry + +commands = + poetry install -v + poetry run pytest + poetry run black --check tap_klaviyo/ + poetry run flake8 tap_klaviyo + poetry run pydocstyle tap_klaviyo + poetry run mypy tap_klaviyo --exclude='tap_klaviyo/tests' + +[testenv:pytest] +# Run the python tests. +# To execute, run `tox -e pytest` +envlist = py37, py38, py39 +commands = + poetry install -v + poetry run pytest + +[testenv:format] +# Attempt to auto-resolve lint errors before they are raised. +# To execute, run `tox -e format` +commands = + poetry install -v + poetry run black tap_klaviyo/ + poetry run isort tap_klaviyo + +[testenv:lint] +# Raise an error if lint and style standards are not met. +# To execute, run `tox -e lint` +commands = + poetry install -v + poetry run black --check --diff tap_klaviyo/ + poetry run isort --check tap_klaviyo + poetry run flake8 tap_klaviyo + poetry run pydocstyle tap_klaviyo + # refer to mypy.ini for specific settings + poetry run mypy tap_klaviyo --exclude='tap_klaviyo/tests' + +[flake8] +ignore = W503 +max-line-length = 88 +max-complexity = 10 + +[pydocstyle] +ignore = D105,D203,D213