Skip to content

Commit

Permalink
Discover fix for custom fields from HGI-5271 (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
xacadil authored Feb 12, 2024
1 parent d59acea commit eab4c90
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 15 deletions.
44 changes: 30 additions & 14 deletions tap_pipedrive/streams/recents/dynamic_typing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,48 @@ class DynamicTypingRecentsStream(RecentsStream):
fields_more_items_in_collection = True
fields_start = 0
fields_limit = 100

schema_mapping = {}

def clean_string(self,string):
return string.replace(" ", "_").replace("-", "_").replace("/", "_").replace("(", "").replace(")", "").replace(".", "").replace(",", "").replace(":", "").replace(";", "").replace("&", "and").replace("'", "").replace('"', "").lower()
def get_fields_response(self,limit,start):
fields_params = {"limit" : limit, "start" : start}
try:
fields_response = self.tap.execute_request(endpoint=self.fields_endpoint, params=fields_params)
except (ConnectionError, RequestException) as e:
raise e
return fields_response
def get_schema_mapping(self):
if self.schema_mapping:
return self.schema_mapping
else:
self.get_schema()
return self.schema_mapping

def get_schema(self):
if not self.schema_cache:
schema = self.load_schema()

while self.fields_more_items_in_collection:

fields_params = {"limit" : self.fields_limit, "start" : self.fields_start}

try:
fields_response = self.tap.execute_request(endpoint=self.fields_endpoint, params=fields_params)
except (ConnectionError, RequestException) as e:
raise e

fields_response = self.get_fields_response(self.fields_limit,self.fields_start)
try:
payload = fields_response.json() # Verifying response in execute_request

for property in payload['data']:
if property['key'] not in self.static_fields:
logger.debug(property['key'], property['field_type'], property['mandatory_flag'])

if property['key'] in schema['properties']:
key = f"{property['key']}"
if property.get("edit_flag",False):
key = self.clean_string(property['name'])
if property.get("is_subfield"):
key = self.clean_string(property['name'])
self.schema_mapping[property['key']] = key
if key not in self.static_fields:
logger.debug(key, property['field_type'], property['mandatory_flag'])

if key in schema['properties']:
logger.warn('Dynamic property "{}" overrides with type {} existing entry in ' \
'static JSON schema of {} stream.'.format(
property['key'],
key,
property['field_type'],
self.schema
)
Expand All @@ -64,7 +80,7 @@ def get_schema(self):
# mandatory for another amount of time
property_content['type'].append('null')

schema['properties'][property['key']] = property_content
schema['properties'][key] = property_content

# Check for more data is available in next page
if 'additional_data' in payload and 'pagination' in payload['additional_data']:
Expand Down
6 changes: 5 additions & 1 deletion tap_pipedrive/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,19 @@ def do_paginate(self, stream, stream_metadata):
self.validate_response(response)
self.rate_throttling(response)
stream.paginate(response)
schema_mapping = stream.get_schema_mapping()

# records with metrics
with singer.metrics.record_counter(stream.schema) as counter:
with singer.Transformer(singer.NO_INTEGER_DATETIME_PARSING) as optimus_prime:
for row in self.iterate_response(response):
row = stream.process_row(row)

if not row: # in case of a non-empty response with an empty element
continue
row_keys = list(row.keys())
for row_key in row_keys:
if row_key in schema_mapping:
row[schema_mapping[row_key]] = row.pop(row_key)
row = optimus_prime.transform(row, stream.get_schema(), stream_metadata)
if stream.write_record(row):
counter.increment()
Expand Down

0 comments on commit eab4c90

Please sign in to comment.