Skip to content

Commit

Permalink
HGI-6592 / Add field_meta and stream_meta keys (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
arilton authored Oct 8, 2024
1 parent ac385c2 commit 97b0457
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 4 deletions.
4 changes: 2 additions & 2 deletions tap_pipedrive/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ def main():
pipedrive_tap = PipedriveTap(args.config, args.state)

if args.discover:
catalog = pipedrive_tap.do_discover()
json.dump(catalog.to_dict(), sys.stdout, indent=2)
catalog = pipedrive_tap.do_discover(return_dict=True)
json.dump(catalog, sys.stdout, indent=2)
logger.info('Finished discover')
else:
if args.catalog:
Expand Down
1 change: 1 addition & 0 deletions tap_pipedrive/streams/activity_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

class ActivityTypesStream(PipedriveStream):
endpoint = 'activityTypes'
metadata_endpoint = 'activityFields'
schema = 'activity_types'
key_properties = ['id']
state_field = 'update_time'
Expand Down
1 change: 1 addition & 0 deletions tap_pipedrive/streams/deal_products.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
class DealsProductsStream(PipedriveIterStream):
base_endpoint = 'deals'
id_endpoint = 'deals/{}/products'
metadata_endpoint = 'productFields'
schema = 'deal_products'
key_properties = ['id']
replication_method = 'INCREMENTAL'
Expand Down
1 change: 1 addition & 0 deletions tap_pipedrive/streams/notes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

class NotesStream(PipedriveStream):
endpoint = "notes"
metadata_endpoint = 'noteFields'
schema = "notes"
key_properties = ["id"]
replication_method = "INCREMENTAL"
Expand Down
40 changes: 38 additions & 2 deletions tap_pipedrive/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,11 @@ def __init__(self, config, state):
self.config['start_date'] = pendulum.parse(self.config['start_date'])
self.state = state

def do_discover(self):
def do_discover(self, return_dict=False):
logger.info('Starting discover')

catalog = Catalog([])
catalog_stream_meta_dict = {}

for stream in self.streams:
stream.tap = self
Expand Down Expand Up @@ -156,7 +157,42 @@ def do_discover(self):
schema=schema,
metadata=meta
))

catalog_stream_meta_dict[stream.schema] = meta
if return_dict:
cd = catalog.to_dict()
for catalog_stream in cd.get('streams', []):
data = []
catalog_stream['stream_meta'] = catalog_stream_meta_dict[catalog_stream['stream']]
try:
stream = next(filter(lambda stream: stream.schema == catalog_stream['stream'], self.streams))
if getattr(stream, 'metadata_endpoint', None):
response = self.execute_request(stream.metadata_endpoint)
res_json = response.json()
if 'data' in res_json:
data = res_json['data']
is_more_pages = res_json.get('additional_data', {}).get('pagination', {}).get('more_items_in_collection', False)
start = 0
while is_more_pages:
start += 500
response = self.execute_request(stream.metadata_endpoint, {'start': start})
res_json = response.json()
data += res_json['data']
is_more_pages = res_json.get('additional_data', {}).get('pagination', {}).get('more_items_in_collection', False)
except Exception as exc:
logger.warning(f'Failed to find matched catalog. catalog_stream={catalog_stream} and stream={stream}. Error: {exc}')
schema = Schema.from_dict(stream.get_schema())
for field_key in schema.properties.keys():
catalog_stream['schema']['properties'][field_key]['field_meta'] = {}
if data:
try:
field_metadata = list(filter(lambda item: item['key'] == field_key, data))
if field_metadata:
field_metadata = field_metadata[0]
field_metadata['label'] = field_metadata['name']
catalog_stream['schema']['properties'][field_key]['field_meta'] = field_metadata
except Exception as exc:
logger.warning(f'Failed to find the field={field_key} in data. Error: {exc}')
return cd
return catalog

def do_sync(self, catalog):
Expand Down

0 comments on commit 97b0457

Please sign in to comment.