Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add concurrency to the discover #29

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 96 additions & 41 deletions tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from tap_salesforce.salesforce.bulk import Bulk
from tap_salesforce.salesforce.exceptions import (
TapSalesforceException, TapSalesforceQuotaExceededException, TapSalesforceBulkAPIDisabledException)
from concurrent.futures import ThreadPoolExecutor, as_completed

LOGGER = singer.get_logger()

Expand Down Expand Up @@ -252,14 +253,83 @@ def get_views_list(sf):

return responses

def discover_stream(
sf, sobject_name, entries, sf_custom_setting_objects, object_to_tag_references
):
# Skip blacklisted SF objects depending on the api_type in use
# ChangeEvent objects are not queryable via Bulk or REST (undocumented)
if (
sobject_name in sf.get_blacklisted_objects()
and sobject_name not in ACTIVITY_STREAMS
) or sobject_name.endswith("ChangeEvent"):
return

sobject_description = sf.describe(sobject_name)

if sobject_description is None:
return

# Cache customSetting and Tag objects to check for blacklisting after
# all objects have been described
if sobject_description.get("customSetting"):
sf_custom_setting_objects.append(sobject_name)
elif sobject_name.endswith("__Tag"):
relationship_field = next(
(
f
for f in sobject_description["fields"]
if f.get("relationshipName") == "Item"
),
None,
)
if relationship_field:
# Map {"Object":"Object__Tag"}
object_to_tag_references[
relationship_field["referenceTo"][0]
] = sobject_name

fields = sobject_description["fields"]
replication_key = get_replication_key(sobject_name, fields)

# Salesforce Objects are skipped when they do not have an Id field
if not [f["name"] for f in fields if f["name"] == "Id"]:
LOGGER.info(
"Skipping Salesforce Object %s, as it has no Id field", sobject_name
)
return

entry = generate_schema(fields, sf, sobject_name, replication_key)
entries.append(entry)


def run_concurrently(fn, fn_args_list):
all_tasks = []

# This function is used to garantee the right other of returns.
# It saves the index of the `fn_args` used to call `fn`.
def fn_with_index(index, *fn_args):
result = fn(*fn_args)
return (index, result)

with ThreadPoolExecutor(max_workers=5) as executor:
for (index, fn_args) in enumerate(fn_args_list):
all_tasks.append(executor.submit(fn_with_index, *[index, *fn_args]))

results = []

for future in as_completed(all_tasks):
(index, result) = future.result()
# Insert the result in the right index of the list
results.insert(index, result)
Comment on lines +318 to +323

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
results = []
for future in as_completed(all_tasks):
(index, result) = future.result()
# Insert the result in the right index of the list
results.insert(index, result)
results = [None] * len(fn_args_list) # Preallocate list for correct ordering
for future in as_completed(all_tasks):
index, result = future.result()
results[index] = result

It's just a suggestion as it's not necessary to do this memory allocation dynamically.


return results

# pylint: disable=too-many-branches,too-many-statements
def do_discover(sf):
"""Describes a Salesforce instance's objects and generates a JSON schema for each field."""
global_description = sf.describe()

objects_to_discover = {o['name'] for o in global_description['sobjects']}
objects_to_discover = {o["name"] for o in global_description["sobjects"]}

sf_custom_setting_objects = []
object_to_tag_references = {}
Expand All @@ -268,47 +338,32 @@ def do_discover(sf):
entries = []

# Check if the user has BULK API enabled
if sf.api_type == 'BULK' and not Bulk(sf).has_permissions():
raise TapSalesforceBulkAPIDisabledException('This client does not have Bulk API permissions, received "API_DISABLED_FOR_ORG" error code')

for sobject_name in sorted(objects_to_discover):

# Skip blacklisted SF objects depending on the api_type in use
# ChangeEvent objects are not queryable via Bulk or REST (undocumented)
if (sobject_name in sf.get_blacklisted_objects() and sobject_name not in ACTIVITY_STREAMS) \
or sobject_name.endswith("ChangeEvent"):
continue

sobject_description = sf.describe(sobject_name)

if sobject_description is None:
continue

# Cache customSetting and Tag objects to check for blacklisting after
# all objects have been described
if sobject_description.get("customSetting"):
sf_custom_setting_objects.append(sobject_name)
elif sobject_name.endswith("__Tag"):
relationship_field = next(
(f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"),
None)
if relationship_field:
# Map {"Object":"Object__Tag"}
object_to_tag_references[relationship_field["referenceTo"]
[0]] = sobject_name

fields = sobject_description['fields']
replication_key = get_replication_key(sobject_name, fields)

# Salesforce Objects are skipped when they do not have an Id field
if not [f["name"] for f in fields if f["name"]=="Id"]:
LOGGER.info(
"Skipping Salesforce Object %s, as it has no Id field",
sobject_name)
continue
if sf.api_type == "BULK" and not Bulk(sf).has_permissions():
raise TapSalesforceBulkAPIDisabledException(
'This client does not have Bulk API permissions, received "API_DISABLED_FOR_ORG" error code'
)

entry = generate_schema(fields, sf, sobject_name, replication_key)
entries.append(entry)
objects_list = sorted(objects_to_discover)
start_counter = 0
concurrency_limit = 25

while start_counter < len(objects_list):
end_counter = start_counter + concurrency_limit
if end_counter >= len(objects_list):
end_counter = len(objects_list)

chunk = objects_list[start_counter:end_counter]
chunk_args = [
[
sf,
sobject_name,
entries,
sf_custom_setting_objects,
object_to_tag_references,
]
for sobject_name in chunk]
run_concurrently(discover_stream, chunk_args)
start_counter = end_counter
Comment on lines +346 to +366

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
objects_list = sorted(objects_to_discover)
start_counter = 0
concurrency_limit = 25
while start_counter < len(objects_list):
end_counter = start_counter + concurrency_limit
if end_counter >= len(objects_list):
end_counter = len(objects_list)
chunk = objects_list[start_counter:end_counter]
chunk_args = [
[
sf,
sobject_name,
entries,
sf_custom_setting_objects,
object_to_tag_references,
]
for sobject_name in chunk]
run_concurrently(discover_stream, chunk_args)
start_counter = end_counter
objects_list = sorted(objects_to_discover)
concurrency_limit = 25
for start_counter in range(0, len(objects_list), concurrency_limit):
chunk = objects_list[start_counter:start_counter + concurrency_limit]
chunk_args = [
(sf, sobject_name, entries, sf_custom_setting_objects, object_to_tag_references)
for sobject_name in chunk
]
run_concurrently(discover_stream, chunk_args)

Seems a little bit clearer like that


# Handle ListViews
views = get_views_list(sf)
Expand Down