Skip to content

Commit

Permalink
Enhance JSON schema type reinforcement in KlaviyoStream to support va…
Browse files Browse the repository at this point in the history
…rious data types, including lists and dictionaries. This update improves data handling during post-processing by ensuring type consistency according to the defined schema.
  • Loading branch information
butkeraites-hotglue committed Nov 29, 2024
1 parent 5a7f62e commit dce9422
Showing 1 changed file with 28 additions and 0 deletions.
28 changes: 28 additions & 0 deletions tap_klaviyo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,38 @@ def get_url_params(
params["filter"] = f"greater-than({self.replication_key},{start_date})"
return params

def _reinforce_jsonschema_type(self, obj, sub_schema):
if "null" in sub_schema["type"] and obj is None:
return None
if "integer" in sub_schema["type"]:
return int(obj)
if "number" in sub_schema["type"]:
return float(obj)
if "string" in sub_schema["type"]:
if self.is_unix_timestamp(obj):
return obj
return str(obj)
if "boolean" in sub_schema["type"]:
return bool(obj)
if type(obj) == list:
if len(obj) > 0:
return [self._reinforce_jsonschema_type(item, sub_schema["items"]) for item in obj]
else:
return []
if type(obj) == dict:
reinforced_obj = {}
for key in obj.keys():
reinforced_obj[key] = self._reinforce_jsonschema_type(obj[key], sub_schema["properties"][key])
return reinforced_obj
else:
raise Exception(f"Unsupported type: {type(obj)}")

def post_process(self, row, context):
row = super().post_process(row, context)
for key, value in row.get("attributes", {}).items():
row[key] = value
row.pop("attributes", None)
row = self._reinforce_jsonschema_type(row, self.schema)
return row

def is_unix_timestamp(self, date):
Expand Down

0 comments on commit dce9422

Please sign in to comment.