From 97b0457f10600f4001d379afc52766eaff0ccdef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Arilton=20Pereira=20Filho?= Date: Tue, 8 Oct 2024 18:50:44 +0200 Subject: [PATCH] HGI-6592 / Add field_meta and stream_meta keys (#6) --- tap_pipedrive/cli.py | 4 +-- tap_pipedrive/streams/activity_types.py | 1 + tap_pipedrive/streams/deal_products.py | 1 + tap_pipedrive/streams/notes.py | 1 + tap_pipedrive/tap.py | 40 +++++++++++++++++++++++-- 5 files changed, 43 insertions(+), 4 deletions(-) diff --git a/tap_pipedrive/cli.py b/tap_pipedrive/cli.py index a6dd389..2281c3b 100644 --- a/tap_pipedrive/cli.py +++ b/tap_pipedrive/cli.py @@ -15,8 +15,8 @@ def main(): pipedrive_tap = PipedriveTap(args.config, args.state) if args.discover: - catalog = pipedrive_tap.do_discover() - json.dump(catalog.to_dict(), sys.stdout, indent=2) + catalog = pipedrive_tap.do_discover(return_dict=True) + json.dump(catalog, sys.stdout, indent=2) logger.info('Finished discover') else: if args.catalog: diff --git a/tap_pipedrive/streams/activity_types.py b/tap_pipedrive/streams/activity_types.py index c40ff7a..e31f88a 100644 --- a/tap_pipedrive/streams/activity_types.py +++ b/tap_pipedrive/streams/activity_types.py @@ -3,6 +3,7 @@ class ActivityTypesStream(PipedriveStream): endpoint = 'activityTypes' + metadata_endpoint = 'activityFields' schema = 'activity_types' key_properties = ['id'] state_field = 'update_time' diff --git a/tap_pipedrive/streams/deal_products.py b/tap_pipedrive/streams/deal_products.py index 30119f6..950d06f 100644 --- a/tap_pipedrive/streams/deal_products.py +++ b/tap_pipedrive/streams/deal_products.py @@ -4,6 +4,7 @@ class DealsProductsStream(PipedriveIterStream): base_endpoint = 'deals' id_endpoint = 'deals/{}/products' + metadata_endpoint = 'productFields' schema = 'deal_products' key_properties = ['id'] replication_method = 'INCREMENTAL' diff --git a/tap_pipedrive/streams/notes.py b/tap_pipedrive/streams/notes.py index 66be6ee..1428f7e 100644 --- a/tap_pipedrive/streams/notes.py +++ b/tap_pipedrive/streams/notes.py @@ -3,6 +3,7 @@ class NotesStream(PipedriveStream): endpoint = "notes" + metadata_endpoint = 'noteFields' schema = "notes" key_properties = ["id"] replication_method = "INCREMENTAL" diff --git a/tap_pipedrive/tap.py b/tap_pipedrive/tap.py index d18dc50..36b25d2 100644 --- a/tap_pipedrive/tap.py +++ b/tap_pipedrive/tap.py @@ -118,10 +118,11 @@ def __init__(self, config, state): self.config['start_date'] = pendulum.parse(self.config['start_date']) self.state = state - def do_discover(self): + def do_discover(self, return_dict=False): logger.info('Starting discover') catalog = Catalog([]) + catalog_stream_meta_dict = {} for stream in self.streams: stream.tap = self @@ -156,7 +157,42 @@ def do_discover(self): schema=schema, metadata=meta )) - + catalog_stream_meta_dict[stream.schema] = meta + if return_dict: + cd = catalog.to_dict() + for catalog_stream in cd.get('streams', []): + data = [] + catalog_stream['stream_meta'] = catalog_stream_meta_dict[catalog_stream['stream']] + try: + stream = next(filter(lambda stream: stream.schema == catalog_stream['stream'], self.streams)) + if getattr(stream, 'metadata_endpoint', None): + response = self.execute_request(stream.metadata_endpoint) + res_json = response.json() + if 'data' in res_json: + data = res_json['data'] + is_more_pages = res_json.get('additional_data', {}).get('pagination', {}).get('more_items_in_collection', False) + start = 0 + while is_more_pages: + start += 500 + response = self.execute_request(stream.metadata_endpoint, {'start': start}) + res_json = response.json() + data += res_json['data'] + is_more_pages = res_json.get('additional_data', {}).get('pagination', {}).get('more_items_in_collection', False) + except Exception as exc: + logger.warning(f'Failed to find matched catalog. catalog_stream={catalog_stream} and stream={stream}. Error: {exc}') + schema = Schema.from_dict(stream.get_schema()) + for field_key in schema.properties.keys(): + catalog_stream['schema']['properties'][field_key]['field_meta'] = {} + if data: + try: + field_metadata = list(filter(lambda item: item['key'] == field_key, data)) + if field_metadata: + field_metadata = field_metadata[0] + field_metadata['label'] = field_metadata['name'] + catalog_stream['schema']['properties'][field_key]['field_meta'] = field_metadata + except Exception as exc: + logger.warning(f'Failed to find the field={field_key} in data. Error: {exc}') + return cd return catalog def do_sync(self, catalog):