Skip to content

Commit

Permalink
bump schema and change api_key name (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
keyn4 authored Apr 9, 2024
1 parent 6900c45 commit b369353
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 29 deletions.
59 changes: 32 additions & 27 deletions tap_klaviyo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class KlaviyoStream(RESTStream):
@property
def authenticator(self):
# auth with hapikey
if self.config.get("api_private_key"):
api_key = f'Klaviyo-API-Key {self.config.get("api_private_key")}'
if self.config.get("api_key"):
api_key = f'Klaviyo-API-Key {self.config.get("api_key")}'
return APIKeyAuthenticator.create_for_stream(
self, key="Authorization", value=api_key, location="header"
)
Expand Down Expand Up @@ -79,9 +79,8 @@ def get_url_params(

def post_process(self, row, context):
row = super().post_process(row, context)
rep_key = self.replication_key
if row.get("attributes") and self.replication_key:
row[rep_key] = row["attributes"][rep_key]
for key, value in row.get("attributes", {}).items():
row[key] = value
return row

def is_unix_timestamp(self, date):
Expand Down Expand Up @@ -160,30 +159,36 @@ def get_schema(self) -> dict:
properties = []
property_names = set()

# Loop through all records – some objects have different keys
for record in records:
# Loop through each key in the object
for name in record.keys():
if name in property_names:
continue
# Add the new property to our list
property_names.add(name)
if self.is_unix_timestamp(record[name]):
properties.append(th.Property(name, th.DateTimeType))
else:
properties.append(
th.Property(name, self.get_jsonschema_type(record[name]))
)
# if the rep_key is not at a header level add updated as default
if (
self.replication_key is not None
and self.replication_key not in record.keys()
):
record = records[0]
# put attributes fields at header level
attributes = record.pop("attributes", {})
if attributes:
record.update(attributes)

# Loop through each key in the object
for name in record.keys():
if name in property_names:
continue
# Add the new property to our list
property_names.add(name)
if name in ["event_properties", "properties"]:
properties.append(
th.Property(self.replication_key, th.DateTimeType)
th.Property(name, th.CustomType({"type": ["object", "string"]}))
)
elif self.is_unix_timestamp(record[name]):
properties.append(th.Property(name, th.DateTimeType))
else:
properties.append(
th.Property(name, self.get_jsonschema_type(record[name]))
)
# we need to process only first record
break
# if the rep_key is not at a header level add updated as default
if (
self.replication_key is not None
and self.replication_key not in record.keys()
):
properties.append(
th.Property(self.replication_key, th.DateTimeType)
)
# Return the list as a JSON Schema dictionary object
property_list = th.PropertiesList(*properties).to_dict()

Expand Down
2 changes: 1 addition & 1 deletion tap_klaviyo/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class EventsStream(KlaviyoStream):
name = "events"
path = "/events"
primary_keys = ["id"]
replication_key = "updated"
replication_key = "datetime"


class ListMembersStream(KlaviyoStream):
Expand Down
2 changes: 1 addition & 1 deletion tap_klaviyo/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __init__(
th.StringType,
),
th.Property(
"api_private_key",
"api_key",
th.StringType,
),
).to_dict()
Expand Down

0 comments on commit b369353

Please sign in to comment.