From 2ba239b4778ee7405aa574ef4e680cc101d211fe Mon Sep 17 00:00:00 2001 From: Key Date: Mon, 11 Mar 2024 17:24:28 -0500 Subject: [PATCH 1/3] add concurrency to the discover --- tap_salesforce/__init__.py | 80 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index 58545259..eabb1e56 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -11,6 +11,7 @@ from tap_salesforce.salesforce.bulk import Bulk from tap_salesforce.salesforce.exceptions import ( TapSalesforceException, TapSalesforceQuotaExceededException, TapSalesforceBulkAPIDisabledException) +from concurrent.futures import ThreadPoolExecutor, as_completed LOGGER = singer.get_logger() @@ -252,7 +253,70 @@ def get_views_list(sf): return responses +def discover_stream(sf, sobject_name, entries, sf_custom_setting_objects, object_to_tag_references): + # Skip blacklisted SF objects depending on the api_type in use + # ChangeEvent objects are not queryable via Bulk or REST (undocumented) + if (sobject_name in sf.get_blacklisted_objects() and sobject_name not in ACTIVITY_STREAMS) \ + or sobject_name.endswith("ChangeEvent"): + return + + # append + + sobject_description = sf.describe(sobject_name) + + if sobject_description is None: + return + + # Cache customSetting and Tag objects to check for blacklisting after + # all objects have been described + if sobject_description.get("customSetting"): + sf_custom_setting_objects.append(sobject_name) + elif sobject_name.endswith("__Tag"): + relationship_field = next( + (f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"), + None) + if relationship_field: + # Map {"Object":"Object__Tag"} + object_to_tag_references[relationship_field["referenceTo"] + [0]] = sobject_name + + fields = sobject_description['fields'] + replication_key = get_replication_key(sobject_name, fields) + + # Salesforce Objects are skipped when they do not have an Id field + if not [f["name"] for f in fields if f["name"]=="Id"]: + LOGGER.info( + "Skipping Salesforce Object %s, as it has no Id field", + sobject_name) + return + + entry = generate_schema(fields, sf, sobject_name, replication_key) + entries.append(entry) + +def run_concurrently(fn, fn_args_list): + all_tasks = [] + + # This function is used to garantee the right other of returns. + # It saves the index of the `fn_args` used to call `fn`. + def fn_with_index(index, *fn_args): + result = fn(*fn_args) + return (index, result) + + with ThreadPoolExecutor(max_workers=5) as executor: + for (index, fn_args) in enumerate(fn_args_list): + all_tasks.append(executor.submit( + fn_with_index, + *[index, *fn_args] + )) + + results = [] + for future in as_completed(all_tasks): + (index, result) = future.result() + # Insert the result in the right index of the list + results.insert(index, result) + + return results # pylint: disable=too-many-branches,too-many-statements def do_discover(sf): @@ -271,6 +335,20 @@ def do_discover(sf): if sf.api_type == 'BULK' and not Bulk(sf).has_permissions(): raise TapSalesforceBulkAPIDisabledException('This client does not have Bulk API permissions, received "API_DISABLED_FOR_ORG" error code') + objects_list = sorted(objects_to_discover) + start_counter = 0 + concurrency_limit = 25 + + while start_counter < len(objects_list): + end_counter = start_counter + concurrency_limit + if end_counter >= len(objects_list): + end_counter = len(objects_list) + + chunk = objects_list[start_counter:end_counter] + chunk_args = [[sf, sobject_name, entries, sf_custom_setting_objects, object_to_tag_references] for sobject_name in chunk] + run_concurrently(discover_stream, chunk_args) + start_counter = end_counter + for sobject_name in sorted(objects_to_discover): # Skip blacklisted SF objects depending on the api_type in use @@ -279,6 +357,8 @@ def do_discover(sf): or sobject_name.endswith("ChangeEvent"): continue + # append + sobject_description = sf.describe(sobject_name) if sobject_description is None: From a404ed74b6d232e5c3eaecd77ba1416a5838d0da Mon Sep 17 00:00:00 2001 From: Key Date: Mon, 11 Mar 2024 17:39:00 -0500 Subject: [PATCH 2/3] format --- tap_salesforce/__init__.py | 62 +++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index eabb1e56..2e480f3a 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -253,15 +253,17 @@ def get_views_list(sf): return responses -def discover_stream(sf, sobject_name, entries, sf_custom_setting_objects, object_to_tag_references): +def discover_stream( + sf, sobject_name, entries, sf_custom_setting_objects, object_to_tag_references +): # Skip blacklisted SF objects depending on the api_type in use # ChangeEvent objects are not queryable via Bulk or REST (undocumented) - if (sobject_name in sf.get_blacklisted_objects() and sobject_name not in ACTIVITY_STREAMS) \ - or sobject_name.endswith("ChangeEvent"): + if ( + sobject_name in sf.get_blacklisted_objects() + and sobject_name not in ACTIVITY_STREAMS + ) or sobject_name.endswith("ChangeEvent"): return - # append - sobject_description = sf.describe(sobject_name) if sobject_description is None: @@ -273,26 +275,33 @@ def discover_stream(sf, sobject_name, entries, sf_custom_setting_objects, object sf_custom_setting_objects.append(sobject_name) elif sobject_name.endswith("__Tag"): relationship_field = next( - (f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"), - None) + ( + f + for f in sobject_description["fields"] + if f.get("relationshipName") == "Item" + ), + None, + ) if relationship_field: # Map {"Object":"Object__Tag"} - object_to_tag_references[relationship_field["referenceTo"] - [0]] = sobject_name + object_to_tag_references[ + relationship_field["referenceTo"][0] + ] = sobject_name - fields = sobject_description['fields'] + fields = sobject_description["fields"] replication_key = get_replication_key(sobject_name, fields) # Salesforce Objects are skipped when they do not have an Id field - if not [f["name"] for f in fields if f["name"]=="Id"]: + if not [f["name"] for f in fields if f["name"] == "Id"]: LOGGER.info( - "Skipping Salesforce Object %s, as it has no Id field", - sobject_name) + "Skipping Salesforce Object %s, as it has no Id field", sobject_name + ) return entry = generate_schema(fields, sf, sobject_name, replication_key) entries.append(entry) + def run_concurrently(fn, fn_args_list): all_tasks = [] @@ -304,10 +313,7 @@ def fn_with_index(index, *fn_args): with ThreadPoolExecutor(max_workers=5) as executor: for (index, fn_args) in enumerate(fn_args_list): - all_tasks.append(executor.submit( - fn_with_index, - *[index, *fn_args] - )) + all_tasks.append(executor.submit(fn_with_index, *[index, *fn_args])) results = [] @@ -323,7 +329,7 @@ def do_discover(sf): """Describes a Salesforce instance's objects and generates a JSON schema for each field.""" global_description = sf.describe() - objects_to_discover = {o['name'] for o in global_description['sobjects']} + objects_to_discover = {o["name"] for o in global_description["sobjects"]} sf_custom_setting_objects = [] object_to_tag_references = {} @@ -332,20 +338,30 @@ def do_discover(sf): entries = [] # Check if the user has BULK API enabled - if sf.api_type == 'BULK' and not Bulk(sf).has_permissions(): - raise TapSalesforceBulkAPIDisabledException('This client does not have Bulk API permissions, received "API_DISABLED_FOR_ORG" error code') + if sf.api_type == "BULK" and not Bulk(sf).has_permissions(): + raise TapSalesforceBulkAPIDisabledException( + 'This client does not have Bulk API permissions, received "API_DISABLED_FOR_ORG" error code' + ) objects_list = sorted(objects_to_discover) start_counter = 0 concurrency_limit = 25 while start_counter < len(objects_list): - end_counter = start_counter + concurrency_limit + end_counter = start_counter + concurrency_limit if end_counter >= len(objects_list): end_counter = len(objects_list) chunk = objects_list[start_counter:end_counter] - chunk_args = [[sf, sobject_name, entries, sf_custom_setting_objects, object_to_tag_references] for sobject_name in chunk] + chunk_args = [ + [ + sf, + sobject_name, + entries, + sf_custom_setting_objects, + object_to_tag_references, + ] + for sobject_name in chunk] run_concurrently(discover_stream, chunk_args) start_counter = end_counter @@ -357,8 +373,6 @@ def do_discover(sf): or sobject_name.endswith("ChangeEvent"): continue - # append - sobject_description = sf.describe(sobject_name) if sobject_description is None: From edb001321d13c167b9487993459c3d5768d763ce Mon Sep 17 00:00:00 2001 From: Key Date: Tue, 12 Mar 2024 08:20:04 -0500 Subject: [PATCH 3/3] delete old loop --- tap_salesforce/__init__.py | 39 -------------------------------------- 1 file changed, 39 deletions(-) diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index 2e480f3a..a3324c85 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -364,45 +364,6 @@ def do_discover(sf): for sobject_name in chunk] run_concurrently(discover_stream, chunk_args) start_counter = end_counter - - for sobject_name in sorted(objects_to_discover): - - # Skip blacklisted SF objects depending on the api_type in use - # ChangeEvent objects are not queryable via Bulk or REST (undocumented) - if (sobject_name in sf.get_blacklisted_objects() and sobject_name not in ACTIVITY_STREAMS) \ - or sobject_name.endswith("ChangeEvent"): - continue - - sobject_description = sf.describe(sobject_name) - - if sobject_description is None: - continue - - # Cache customSetting and Tag objects to check for blacklisting after - # all objects have been described - if sobject_description.get("customSetting"): - sf_custom_setting_objects.append(sobject_name) - elif sobject_name.endswith("__Tag"): - relationship_field = next( - (f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"), - None) - if relationship_field: - # Map {"Object":"Object__Tag"} - object_to_tag_references[relationship_field["referenceTo"] - [0]] = sobject_name - - fields = sobject_description['fields'] - replication_key = get_replication_key(sobject_name, fields) - - # Salesforce Objects are skipped when they do not have an Id field - if not [f["name"] for f in fields if f["name"]=="Id"]: - LOGGER.info( - "Skipping Salesforce Object %s, as it has no Id field", - sobject_name) - continue - - entry = generate_schema(fields, sf, sobject_name, replication_key) - entries.append(entry) # Handle ListViews views = get_views_list(sf)