Skip to content

Commit

Permalink
Enhance JSON schema type reinforcement in KlaviyoStream to support va… (
Browse files Browse the repository at this point in the history
#9)

* Enhance JSON schema type reinforcement in KlaviyoStream to support various data types, including lists and dictionaries. This update improves data handling during post-processing by ensuring type consistency according to the defined schema.

* Improve type handling in KlaviyoStream by adding exception handling for integer, float, boolean, and string conversions. Update schema type definitions to support multiple types (string, number, object) for empty objects and arrays, enhancing data validation and consistency.

* Remove redundant JSON schema type reinforcement method in KlaviyoStream to streamline post-processing. This change simplifies the code and enhances maintainability by relying on existing type handling mechanisms.
  • Loading branch information
butkeraites-hotglue authored Nov 30, 2024
1 parent 5a7f62e commit 4849ecb
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions tap_klaviyo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,13 @@ def get_url_params(
params["filter"] = f"greater-than({self.replication_key},{start_date})"
return params



def post_process(self, row, context):
row = super().post_process(row, context)
for key, value in row.get("attributes", {}).items():
row[key] = value
row.pop("attributes", None)
return row

def is_unix_timestamp(self, date):
Expand Down Expand Up @@ -124,17 +127,17 @@ def get_jsonschema_type(self, obj):
return th.ArrayType(self.get_jsonschema_type(obj[0]))
else:
return th.ArrayType(
th.CustomType({"type": ["string"]})
th.CustomType({"type": ["string", "number", "object"]})
)
if dtype == dict:
obj_props = []
for key in obj.keys():
obj_props.append(th.Property(key, self.get_jsonschema_type(obj[key])))
if not obj_props:
return th.CustomType({"type": ["string"]})
return th.CustomType({"type": ["string", "number", "object"]})
return th.ObjectType(*obj_props)
else:
return th.CustomType({"type": ["string"]})
return th.CustomType({"type": ["string", "number", "object"]})

def get_schema(self) -> dict:
"""Dynamically detect the json schema for the stream.
Expand Down

0 comments on commit 4849ecb

Please sign in to comment.