From eab4c907d1dd12d855c601441900e6a0ebf987be Mon Sep 17 00:00:00 2001 From: xacadil <92389481+xacadil@users.noreply.github.com> Date: Mon, 12 Feb 2024 23:50:41 +0500 Subject: [PATCH] Discover fix for custom fields from HGI-5271 (#2) --- .../recents/dynamic_typing/__init__.py | 44 +++++++++++++------ tap_pipedrive/tap.py | 6 ++- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/tap_pipedrive/streams/recents/dynamic_typing/__init__.py b/tap_pipedrive/streams/recents/dynamic_typing/__init__.py index addde66..34e6418 100644 --- a/tap_pipedrive/streams/recents/dynamic_typing/__init__.py +++ b/tap_pipedrive/streams/recents/dynamic_typing/__init__.py @@ -13,7 +13,23 @@ class DynamicTypingRecentsStream(RecentsStream): fields_more_items_in_collection = True fields_start = 0 fields_limit = 100 - + schema_mapping = {} + + def clean_string(self,string): + return string.replace(" ", "_").replace("-", "_").replace("/", "_").replace("(", "").replace(")", "").replace(".", "").replace(",", "").replace(":", "").replace(";", "").replace("&", "and").replace("'", "").replace('"', "").lower() + def get_fields_response(self,limit,start): + fields_params = {"limit" : limit, "start" : start} + try: + fields_response = self.tap.execute_request(endpoint=self.fields_endpoint, params=fields_params) + except (ConnectionError, RequestException) as e: + raise e + return fields_response + def get_schema_mapping(self): + if self.schema_mapping: + return self.schema_mapping + else: + self.get_schema() + return self.schema_mapping def get_schema(self): if not self.schema_cache: @@ -21,24 +37,24 @@ def get_schema(self): while self.fields_more_items_in_collection: - fields_params = {"limit" : self.fields_limit, "start" : self.fields_start} - - try: - fields_response = self.tap.execute_request(endpoint=self.fields_endpoint, params=fields_params) - except (ConnectionError, RequestException) as e: - raise e - + fields_response = self.get_fields_response(self.fields_limit,self.fields_start) try: payload = fields_response.json() # Verifying response in execute_request for property in payload['data']: - if property['key'] not in self.static_fields: - logger.debug(property['key'], property['field_type'], property['mandatory_flag']) - - if property['key'] in schema['properties']: + key = f"{property['key']}" + if property.get("edit_flag",False): + key = self.clean_string(property['name']) + if property.get("is_subfield"): + key = self.clean_string(property['name']) + self.schema_mapping[property['key']] = key + if key not in self.static_fields: + logger.debug(key, property['field_type'], property['mandatory_flag']) + + if key in schema['properties']: logger.warn('Dynamic property "{}" overrides with type {} existing entry in ' \ 'static JSON schema of {} stream.'.format( - property['key'], + key, property['field_type'], self.schema ) @@ -64,7 +80,7 @@ def get_schema(self): # mandatory for another amount of time property_content['type'].append('null') - schema['properties'][property['key']] = property_content + schema['properties'][key] = property_content # Check for more data is available in next page if 'additional_data' in payload and 'pagination' in payload['additional_data']: diff --git a/tap_pipedrive/tap.py b/tap_pipedrive/tap.py index 9786ade..30e6793 100644 --- a/tap_pipedrive/tap.py +++ b/tap_pipedrive/tap.py @@ -268,15 +268,19 @@ def do_paginate(self, stream, stream_metadata): self.validate_response(response) self.rate_throttling(response) stream.paginate(response) + schema_mapping = stream.get_schema_mapping() # records with metrics with singer.metrics.record_counter(stream.schema) as counter: with singer.Transformer(singer.NO_INTEGER_DATETIME_PARSING) as optimus_prime: for row in self.iterate_response(response): row = stream.process_row(row) - if not row: # in case of a non-empty response with an empty element continue + row_keys = list(row.keys()) + for row_key in row_keys: + if row_key in schema_mapping: + row[schema_mapping[row_key]] = row.pop(row_key) row = optimus_prime.transform(row, stream.get_schema(), stream_metadata) if stream.write_record(row): counter.increment()