From 3e963143196eac9b690b04ff822a64616f468068 Mon Sep 17 00:00:00 2001 From: Renan Butkeraites Date: Wed, 25 Sep 2024 19:56:58 -0300 Subject: [PATCH 1/6] Paralellize streams sync --- tap_salesforce/__init__.py | 335 ++++++++++++++++++++++--------------- tap_salesforce/sync.py | 273 +++++++++++++++--------------- 2 files changed, 339 insertions(+), 269 deletions(-) diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index 58545259..940a6275 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -12,6 +12,10 @@ from tap_salesforce.salesforce.exceptions import ( TapSalesforceException, TapSalesforceQuotaExceededException, TapSalesforceBulkAPIDisabledException) +import multiprocessing +from functools import partial + + LOGGER = singer.get_logger() REQUIRED_CONFIG_KEYS = ['refresh_token', @@ -52,40 +56,39 @@ def get_replication_key(sobject_name, fields): def stream_is_selected(mdata): return mdata.get((), {}).get('selected', False) -def build_state(raw_state, catalog): +def build_state(raw_state, catalog_entry): state = {} - for catalog_entry in catalog['streams']: - tap_stream_id = catalog_entry['tap_stream_id'] - catalog_metadata = metadata.to_map(catalog_entry['metadata']) - replication_method = catalog_metadata.get((), {}).get('replication-method') - - version = singer.get_bookmark(raw_state, - tap_stream_id, - 'version') - - # Preserve state that deals with resuming an incomplete bulk job - if singer.get_bookmark(raw_state, tap_stream_id, 'JobID'): - job_id = singer.get_bookmark(raw_state, tap_stream_id, 'JobID') - batches = singer.get_bookmark(raw_state, tap_stream_id, 'BatchIDs') - current_bookmark = singer.get_bookmark(raw_state, tap_stream_id, 'JobHighestBookmarkSeen') - state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) - state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batches) - state = singer.write_bookmark(state, tap_stream_id, 'JobHighestBookmarkSeen', current_bookmark) - - if replication_method == 'INCREMENTAL': - replication_key = catalog_metadata.get((), {}).get('replication-key') - replication_key_value = singer.get_bookmark(raw_state, - tap_stream_id, - replication_key) - if version is not None: - state = singer.write_bookmark( - state, tap_stream_id, 'version', version) - if replication_key_value is not None: - state = singer.write_bookmark( - state, tap_stream_id, replication_key, replication_key_value) - elif replication_method == 'FULL_TABLE' and version is None: - state = singer.write_bookmark(state, tap_stream_id, 'version', version) + tap_stream_id = catalog_entry['tap_stream_id'] + catalog_metadata = metadata.to_map(catalog_entry['metadata']) + replication_method = catalog_metadata.get((), {}).get('replication-method') + + version = singer.get_bookmark(raw_state, + tap_stream_id, + 'version') + + # Preserve state that deals with resuming an incomplete bulk job + if singer.get_bookmark(raw_state, tap_stream_id, 'JobID'): + job_id = singer.get_bookmark(raw_state, tap_stream_id, 'JobID') + batches = singer.get_bookmark(raw_state, tap_stream_id, 'BatchIDs') + current_bookmark = singer.get_bookmark(raw_state, tap_stream_id, 'JobHighestBookmarkSeen') + state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) + state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batches) + state = singer.write_bookmark(state, tap_stream_id, 'JobHighestBookmarkSeen', current_bookmark) + + if replication_method == 'INCREMENTAL': + replication_key = catalog_metadata.get((), {}).get('replication-key') + replication_key_value = singer.get_bookmark(raw_state, + tap_stream_id, + replication_key) + if version is not None: + state = singer.write_bookmark( + state, tap_stream_id, 'version', version) + if replication_key_value is not None: + state = singer.write_bookmark( + state, tap_stream_id, replication_key, replication_key_value) + elif replication_method == 'FULL_TABLE' and version is None: + state = singer.write_bookmark(state, tap_stream_id, 'version', version) return state @@ -397,7 +400,7 @@ def do_discover(sf): result = {'streams': entries} json.dump(result, sys.stdout, indent=4) -def do_sync(sf, catalog, state,config=None): +def do_sync(sf, catalog_entry, state, catalog,config=None): input_state = state.copy() starting_stream = state.get("current_stream") @@ -405,112 +408,131 @@ def do_sync(sf, catalog, state,config=None): LOGGER.info("Resuming sync from %s", starting_stream) else: LOGGER.info("Starting sync") - catalog = prepare_reports_streams(catalog) - # Set ListView as first stream to sync to avoid issues with replication-keys - list_view = [c for c in catalog["streams"] if c["stream"]=="ListView"] - catalog["streams"] = [c for c in catalog["streams"] if c["stream"]!="ListView"] - catalog["streams"] = list_view + catalog["streams"] + stream_version = get_stream_version(catalog_entry, state) + stream = catalog_entry['stream'] + stream_alias = catalog_entry.get('stream_alias') + stream_name = catalog_entry["tap_stream_id"].replace("/","_") + activate_version_message = singer.ActivateVersionMessage( + stream=(stream_alias or stream.replace("/","_")), version=stream_version) - # Sync Streams - for catalog_entry in catalog["streams"]: - stream_version = get_stream_version(catalog_entry, state) - stream = catalog_entry['stream'] - stream_alias = catalog_entry.get('stream_alias') - stream_name = catalog_entry["tap_stream_id"].replace("/","_") - activate_version_message = singer.ActivateVersionMessage( - stream=(stream_alias or stream.replace("/","_")), version=stream_version) + catalog_metadata = metadata.to_map(catalog_entry['metadata']) + replication_key = catalog_metadata.get((), {}).get('replication-key') - catalog_metadata = metadata.to_map(catalog_entry['metadata']) - replication_key = catalog_metadata.get((), {}).get('replication-key') + mdata = metadata.to_map(catalog_entry['metadata']) - mdata = metadata.to_map(catalog_entry['metadata']) + if not stream_is_selected(mdata): + LOGGER.info("%s: Skipping - not selected", stream_name) + return - if not stream_is_selected(mdata): - LOGGER.info("%s: Skipping - not selected", stream_name) - continue - - if starting_stream: - if starting_stream == stream_name: - LOGGER.info("%s: Resuming", stream_name) - starting_stream = None - else: - LOGGER.info("%s: Skipping - already synced", stream_name) - continue - else: - LOGGER.info("%s: Starting", stream_name) - - state["current_stream"] = stream_name - singer.write_state(state) - key_properties = metadata.to_map(catalog_entry['metadata']).get((), {}).get('table-key-properties') - singer.write_schema( - stream.replace("/","_"), - catalog_entry['schema'], - key_properties, - replication_key, - stream_alias) - - job_id = singer.get_bookmark(state, catalog_entry['tap_stream_id'], 'JobID') - if job_id: - with metrics.record_counter(stream) as counter: - LOGGER.info("Found JobID from previous Bulk Query. Resuming sync for job: %s", job_id) - # Resuming a sync should clear out the remaining state once finished - counter = resume_syncing_bulk_query(sf, catalog_entry, job_id, state, counter) - LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value) - # Remove Job info from state once we complete this resumed query. One of a few cases could have occurred: - # 1. The job succeeded, in which case make JobHighestBookmarkSeen the new bookmark - # 2. The job partially completed, in which case make JobHighestBookmarkSeen the new bookmark, or - # existing bookmark if no bookmark exists for the Job. - # 3. The job completely failed, in which case maintain the existing bookmark, or None if no bookmark - state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobID', None) - state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('BatchIDs', None) - bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \ - .pop('JobHighestBookmarkSeen', None) - existing_bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \ - .pop(replication_key, None) - state = singer.write_bookmark( - state, - catalog_entry['tap_stream_id'], - replication_key, - bookmark or existing_bookmark) # If job is removed, reset to existing bookmark or None - singer.write_state(state) + if starting_stream: + if starting_stream == stream_name: + LOGGER.info("%s: Resuming", stream_name) + starting_stream = None else: - # Tables with a replication_key or an empty bookmark will emit an - # activate_version at the beginning of their sync - bookmark_is_empty = state.get('bookmarks', {}).get( - catalog_entry['tap_stream_id']) is None - - if "/" in state["current_stream"]: - # get current name - old_key = state["current_stream"] - # get the new key name - new_key = old_key.replace("/","_") - state["current_stream"] = new_key - - catalog_entry['tap_stream_id'] = catalog_entry['tap_stream_id'].replace("/","_") - if replication_key or bookmark_is_empty: - singer.write_message(activate_version_message) - state = singer.write_bookmark(state, - catalog_entry['tap_stream_id'], - 'version', - stream_version) - counter = sync_stream(sf, catalog_entry, state, input_state, catalog,config) + LOGGER.info("%s: Skipping - already synced", stream_name) + return + else: + LOGGER.info("%s: Starting", stream_name) + + state["current_stream"] = stream_name + singer.write_state(state) + key_properties = metadata.to_map(catalog_entry['metadata']).get((), {}).get('table-key-properties') + singer.write_schema( + stream.replace("/","_"), + catalog_entry['schema'], + key_properties, + replication_key, + stream_alias) + + job_id = singer.get_bookmark(state, catalog_entry['tap_stream_id'], 'JobID') + if job_id: + with metrics.record_counter(stream) as counter: + LOGGER.info("Found JobID from previous Bulk Query. Resuming sync for job: %s", job_id) + # Resuming a sync should clear out the remaining state once finished + counter = resume_syncing_bulk_query(sf, catalog_entry, job_id, state, counter) LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value) + # Remove Job info from state once we complete this resumed query. One of a few cases could have occurred: + # 1. The job succeeded, in which case make JobHighestBookmarkSeen the new bookmark + # 2. The job partially completed, in which case make JobHighestBookmarkSeen the new bookmark, or + # existing bookmark if no bookmark exists for the Job. + # 3. The job completely failed, in which case maintain the existing bookmark, or None if no bookmark + state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobID', None) + state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('BatchIDs', None) + bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \ + .pop('JobHighestBookmarkSeen', None) + existing_bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \ + .pop(replication_key, None) + state = singer.write_bookmark( + state, + catalog_entry['tap_stream_id'], + replication_key, + bookmark or existing_bookmark) # If job is removed, reset to existing bookmark or None + singer.write_state(state) + else: + # Tables with a replication_key or an empty bookmark will emit an + # activate_version at the beginning of their sync + bookmark_is_empty = state.get('bookmarks', {}).get( + catalog_entry['tap_stream_id']) is None + + if "/" in state["current_stream"]: + # get current name + old_key = state["current_stream"] + # get the new key name + new_key = old_key.replace("/","_") + state["current_stream"] = new_key + + catalog_entry['tap_stream_id'] = catalog_entry['tap_stream_id'].replace("/","_") + if replication_key or bookmark_is_empty: + singer.write_message(activate_version_message) + state = singer.write_bookmark(state, + catalog_entry['tap_stream_id'], + 'version', + stream_version) + counter = sync_stream(sf, catalog_entry, state, input_state, catalog, config) + LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value) state["current_stream"] = None singer.write_state(state) LOGGER.info("Finished sync") +def process_catalog_entry(catalog_entry, sf_data, state, catalog, config): + # Reinitialize Salesforce object in the child process using parent's session + sf = Salesforce( + refresh_token=sf_data['refresh_token'], # Still keep refresh_token + sf_client_id=sf_data['client_id'], + sf_client_secret=sf_data['client_secret'], + quota_percent_total=sf_data.get('quota_percent_total'), + quota_percent_per_run=sf_data.get('quota_percent_per_run'), + is_sandbox=sf_data.get('is_sandbox'), + select_fields_by_default=sf_data.get('select_fields_by_default'), + default_start_date=sf_data.get('start_date'), + api_type=sf_data.get('api_type'), + list_reports=sf_data.get('list_reports'), + list_views=sf_data.get('list_views'), + api_version=sf_data.get('api_version') + ) + + # No need to log in again; set the session directly + sf.access_token = sf_data['access_token'] + sf.instance_url = sf_data['instance_url'] + + state = {key: value for key, value in build_state(state, catalog_entry).items()} + LOGGER.info(f"Processing stream: {catalog_entry}") + do_sync(sf, catalog_entry, state, catalog, config) + + def main_impl(): args = singer_utils.parse_args(REQUIRED_CONFIG_KEYS) CONFIG.update(args.config) - sf = None is_sandbox = ( CONFIG.get("base_uri") == "https://test.salesforce.com" if CONFIG.get("base_uri") else CONFIG.get("is_sandbox") ) + CONFIG["is_sandbox"] = is_sandbox + try: sf = Salesforce( refresh_token=CONFIG['refresh_token'], @@ -525,27 +547,66 @@ def main_impl(): list_reports=CONFIG.get('list_reports'), list_views=CONFIG.get('list_views'), api_version=CONFIG.get('api_version') - ) + ) sf.login() + if sf.login_timer: + sf.login_timer.cancel() # Ensure the login timer is cancelled if needed + except Exception as e: + raise e + + if not sf: + return + + if args.discover: + do_discover(sf) + return + + if not args.properties: + return + + catalog = prepare_reports_streams(args.properties) + + list_view = [c for c in catalog["streams"] if c["stream"] == "ListView"] + catalog["streams"] = [c for c in catalog["streams"] if c["stream"] != "ListView"] + catalog["streams"] = list_view + catalog["streams"] + + # Create a dictionary with session details to pass to child processes + sf_data = { + 'access_token': sf.access_token, + 'instance_url': sf.instance_url, + 'refresh_token': CONFIG['refresh_token'], + 'client_id': CONFIG['client_id'], + 'client_secret': CONFIG['client_secret'], + 'quota_percent_total': CONFIG.get('quota_percent_total'), + 'quota_percent_per_run': CONFIG.get('quota_percent_per_run'), + 'is_sandbox': is_sandbox, + 'select_fields_by_default': CONFIG.get('select_fields_by_default'), + 'start_date': CONFIG.get('start_date'), + 'api_type': CONFIG.get('api_type'), + 'list_reports': CONFIG.get('list_reports'), + 'list_views': CONFIG.get('list_views'), + 'api_version': CONFIG.get('api_version'), + } - if args.discover: - do_discover(sf) - elif args.properties: - catalog = args.properties - state = build_state(args.state, catalog) - do_sync(sf, catalog, state,CONFIG) - finally: - if sf: - if sf.rest_requests_attempted > 0: - LOGGER.debug( - "This job used %s REST requests towards the Salesforce quota.", - sf.rest_requests_attempted) - if sf.jobs_completed > 0: - LOGGER.debug( - "Replication used %s Bulk API jobs towards the Salesforce quota.", - sf.jobs_completed) - if sf.login_timer: - sf.login_timer.cancel() + # Use multiprocessing to process the catalog entries in parallel + with multiprocessing.Manager() as manager: + managed_state = manager.dict(args.state) # Shared state + + # Create a partial function with shared session and config + process_func = partial(process_catalog_entry, sf_data=sf_data, state=managed_state, catalog=catalog, config=CONFIG) + + # Parallel execution using multiprocessing.Pool + with multiprocessing.Pool(processes=8) as pool: + pool.map(process_func, catalog["streams"]) + + if sf.rest_requests_attempted > 0: + LOGGER.debug( + "This job used %s REST requests towards the Salesforce quota.", + sf.rest_requests_attempted) + if sf.jobs_completed > 0: + LOGGER.debug( + "Replication used %s Bulk API jobs towards the Salesforce quota.", + sf.jobs_completed) def prepare_reports_streams(catalog): streams = catalog["streams"] diff --git a/tap_salesforce/sync.py b/tap_salesforce/sync.py index 08210674..92f9586d 100644 --- a/tap_salesforce/sync.py +++ b/tap_salesforce/sync.py @@ -248,128 +248,63 @@ def sync_records(sf, catalog_entry, state, input_state, counter, catalog,config= singer_utils.strftime(chunked_bookmark)) if catalog_entry["stream"].startswith("Report_"): - report_name = catalog_entry["stream"].split("Report_", 1)[1] + sync_report_streams(sf, catalog_entry, stream, schema, stream_alias, stream_version, start_time) + return + if "ListViews" == catalog_entry["stream"]: + sync_list_views_stream(sf, catalog_entry, state, input_state, catalog, replication_key, start_time) + return - reports = [] - done = False - headers = sf._get_standard_headers() - endpoint = "queryAll" - params = {'q': 'SELECT Id,DeveloperName FROM Report'} - url = sf.data_url.format(sf.instance_url, endpoint) - - while not done: - response = sf._make_request('GET', url, headers=headers, params=params) - response_json = response.json() - done = response_json.get("done") - reports.extend(response_json.get("records", [])) - if not done: - url = sf.instance_url+response_json.get("nextRecordsUrl") - - report = [r for r in reports if report_name==r["DeveloperName"]][0] - report_id = report["Id"] - - endpoint = f"analytics/reports/{report_id}" - url = sf.data_url.format(sf.instance_url, endpoint) - response = sf._make_request('GET', url, headers=headers) + query_response = sf.query(catalog_entry, state) + if catalog_entry["stream"] in ACTIVITY_STREAMS: + start_date_str = sf.get_start_date(state, catalog_entry) + start_date = singer_utils.strptime_with_tz(start_date_str) + start_date = singer_utils.strftime(start_date) - with Transformer(pre_hook=transform_bulk_data_hook) as transformer: - rec = transformer.transform(response.json(), schema) - rec = fix_record_anytype(rec, schema) - stream = stream.replace("/","_") - singer.write_message( - singer.RecordMessage( - stream=( - stream_alias or stream), - record=rec, - version=stream_version, - time_extracted=start_time)) - - elif "ListViews" == catalog_entry["stream"]: - headers = sf._get_standard_headers() - endpoint = "queryAll" - - params = {'q': f'SELECT Name,Id,SobjectType,DeveloperName FROM ListView'} - url = sf.data_url.format(sf.instance_url, endpoint) - response = sf._make_request('GET', url, headers=headers, params=params) - - Id_Sobject = [{"Id":r["Id"],"SobjectType": r["SobjectType"],"DeveloperName":r["DeveloperName"],"Name":r["Name"]} - for r in response.json().get('records',[]) if r["Name"]] - - selected_lists_names = [] - for ln in catalog_entry.get("metadata",[])[:-1]: - if ln.get("metadata",[])['selected']: - selected_list = ln.get('breadcrumb',[])[1] - for isob in Id_Sobject: - if selected_list==f"ListView_{isob['SobjectType']}_{isob['DeveloperName']}": - selected_lists_names.append(isob) - - replication_key_value = replication_key and singer_utils.strptime_with_tz(rec[replication_key]) - - for list_info in selected_lists_names: - - sobject = list_info['SobjectType'] - lv_name = list_info['DeveloperName'] - lv_id = list_info['Id'] - - lv_catalog = [x for x in catalog["streams"] if x["stream"] == sobject] + selected_properties = sf._get_selected_properties(catalog_entry) - if lv_catalog: - lv_catalog_entry = lv_catalog[0].copy() - try: - handle_ListView(sf,lv_id,sobject,lv_name,lv_catalog_entry,state,input_state,start_time) - except RequestException as e: - LOGGER.warning(f"No existing /'results/' endpoint was found for SobjectType:{sobject}, Id:{lv_id}") + query_map = { + "ActivityHistory": "ActivityHistories", + "OpenActivity": "OpenActivities" + } - else: - if catalog_entry["stream"] in ACTIVITY_STREAMS: - start_date_str = sf.get_start_date(state, catalog_entry) - start_date = singer_utils.strptime_with_tz(start_date_str) - start_date = singer_utils.strftime(start_date) + query_field = query_map[catalog_entry['stream']] - selected_properties = sf._get_selected_properties(catalog_entry) + query = "SELECT {} FROM {}".format(",".join(selected_properties), query_field) + query = f"SELECT ({query}) FROM Contact" - query_map = { - "ActivityHistory": "ActivityHistories", - "OpenActivity": "OpenActivities" - } + catalog_metadata = metadata.to_map(catalog_entry['metadata']) + replication_key = catalog_metadata.get((), {}).get('replication-key') - query_field = query_map[catalog_entry['stream']] + order_by = "" + if replication_key: + where_clause = " WHERE {} > {} ".format( + replication_key, + start_date) + order_by = " ORDER BY {} ASC".format(replication_key) + query = query + where_clause + order_by - query = "SELECT {} FROM {}".format(",".join(selected_properties), query_field) - query = f"SELECT ({query}) FROM Contact" + def unwrap_query(query_response, query_field): + for q in query_response: + if q.get(query_field): + for f in q[query_field]["records"]: + yield f - catalog_metadata = metadata.to_map(catalog_entry['metadata']) - replication_key = catalog_metadata.get((), {}).get('replication-key') + query_response = sf.query(catalog_entry, state, query_override=query) + query_response = unwrap_query(query_response, query_field) - order_by = "" - if replication_key: - where_clause = " WHERE {} > {} ".format( - replication_key, - start_date) - order_by = " ORDER BY {} ASC".format(replication_key) - query = query + where_clause + order_by - - def unwrap_query(query_response, query_field): - for q in query_response: - if q.get(query_field): - for f in q[query_field]["records"]: - yield f - - query_response = sf.query(catalog_entry, state, query_override=query) - query_response = unwrap_query(query_response, query_field) - else: - query_response = sf.query(catalog_entry, state) + sync_others(sf, catalog_entry, state, input_state, counter, catalog, download_files, chunked_bookmark, stream, schema, stream_alias, replication_key, stream_version, start_time, query_response) - for rec in query_response: - counter.increment() - with Transformer(pre_hook=transform_bulk_data_hook) as transformer: - rec = transformer.transform(rec, schema) - rec = fix_record_anytype(rec, schema) - if stream=='ContentVersion': - if "IsLatest" in rec: - if rec['IsLatest']==True and download_files==True: - rec['TextPreview'] = base64.b64encode(get_content_document_file(sf,rec['Id'])).decode('utf-8') - singer.write_message( +def sync_others(sf, catalog_entry, state, input_state, counter, catalog, download_files, chunked_bookmark, stream, schema, stream_alias, replication_key, stream_version, start_time, query_response): + for rec in query_response: + counter.increment() + with Transformer(pre_hook=transform_bulk_data_hook) as transformer: + rec = transformer.transform(rec, schema) + rec = fix_record_anytype(rec, schema) + if stream=='ContentVersion': + if "IsLatest" in rec: + if rec['IsLatest']==True and download_files==True: + rec['TextPreview'] = base64.b64encode(get_content_document_file(sf,rec['Id'])).decode('utf-8') + singer.write_message( singer.RecordMessage( stream=( stream_alias or stream), @@ -377,41 +312,115 @@ def unwrap_query(query_response, query_field): version=stream_version, time_extracted=start_time)) - replication_key_value = replication_key and singer_utils.strptime_with_tz(rec[replication_key]) + replication_key_value = replication_key and singer_utils.strptime_with_tz(rec[replication_key]) - if sf.pk_chunking: - if replication_key_value and replication_key_value <= start_time and replication_key_value > chunked_bookmark: + if sf.pk_chunking: + if replication_key_value and replication_key_value <= start_time and replication_key_value > chunked_bookmark: # Replace the highest seen bookmark and save the state in case we need to resume later - chunked_bookmark = singer_utils.strptime_with_tz(rec[replication_key]) - state = singer.write_bookmark( + chunked_bookmark = singer_utils.strptime_with_tz(rec[replication_key]) + state = singer.write_bookmark( state, catalog_entry['tap_stream_id'], 'JobHighestBookmarkSeen', singer_utils.strftime(chunked_bookmark)) - singer.write_state(state) + singer.write_state(state) # Before writing a bookmark, make sure Salesforce has not given us a # record with one outside our range - elif replication_key_value and replication_key_value <= start_time: - state = singer.write_bookmark( + elif replication_key_value and replication_key_value <= start_time: + state = singer.write_bookmark( state, catalog_entry['tap_stream_id'], replication_key, rec[replication_key]) - singer.write_state(state) + singer.write_state(state) - selected = get_selected_streams(catalog) - if stream == "ListView" and rec.get("SobjectType") in selected and rec.get("Id") is not None: + selected = get_selected_streams(catalog) + if stream == "ListView" and rec.get("SobjectType") in selected and rec.get("Id") is not None: # Handle listview - try: - sobject = rec["SobjectType"] - lv_name = rec["DeveloperName"] - lv_catalog = [x for x in catalog["streams"] if x["stream"] == sobject] - rec_id = rec["Id"] - lv_catalog_entry = lv_catalog[0].copy() - if len(lv_catalog) > 0: - handle_ListView(sf,rec_id,sobject,lv_name,lv_catalog_entry,state,input_state,start_time) - except RequestException as e: - pass + try: + sobject = rec["SobjectType"] + lv_name = rec["DeveloperName"] + lv_catalog = [x for x in catalog["streams"] if x["stream"] == sobject] + rec_id = rec["Id"] + lv_catalog_entry = lv_catalog[0].copy() + if len(lv_catalog) > 0: + handle_ListView(sf,rec_id,sobject,lv_name,lv_catalog_entry,state,input_state,start_time) + except RequestException as e: + pass + +def sync_list_views_stream(sf, catalog_entry, state, input_state, catalog, replication_key, start_time): + headers = sf._get_standard_headers() + endpoint = "queryAll" + + params = {'q': f'SELECT Name,Id,SobjectType,DeveloperName FROM ListView'} + url = sf.data_url.format(sf.instance_url, endpoint) + response = sf._make_request('GET', url, headers=headers, params=params) + + Id_Sobject = [{"Id":r["Id"],"SobjectType": r["SobjectType"],"DeveloperName":r["DeveloperName"],"Name":r["Name"]} + for r in response.json().get('records',[]) if r["Name"]] + + selected_lists_names = [] + for ln in catalog_entry.get("metadata",[])[:-1]: + if ln.get("metadata",[])['selected']: + selected_list = ln.get('breadcrumb',[])[1] + for isob in Id_Sobject: + if selected_list==f"ListView_{isob['SobjectType']}_{isob['DeveloperName']}": + selected_lists_names.append(isob) + + replication_key_value = replication_key and singer_utils.strptime_with_tz(rec[replication_key]) + + for list_info in selected_lists_names: + sobject = list_info['SobjectType'] + lv_name = list_info['DeveloperName'] + lv_id = list_info['Id'] + + lv_catalog = [x for x in catalog["streams"] if x["stream"] == sobject] + + if lv_catalog: + lv_catalog_entry = lv_catalog[0].copy() + try: + handle_ListView(sf,lv_id,sobject,lv_name,lv_catalog_entry,state,input_state,start_time) + except RequestException as e: + LOGGER.warning(f"No existing /'results/' endpoint was found for SobjectType:{sobject}, Id:{lv_id}") + +def sync_report_streams(sf, catalog_entry, stream, schema, stream_alias, stream_version, start_time): + report_name = catalog_entry["stream"].split("Report_", 1)[1] + + reports = [] + done = False + headers = sf._get_standard_headers() + endpoint = "queryAll" + params = {'q': 'SELECT Id,DeveloperName FROM Report'} + url = sf.data_url.format(sf.instance_url, endpoint) + + while not done: + response = sf._make_request('GET', url, headers=headers, params=params) + response_json = response.json() + reports.extend(response_json.get("records", [])) + done = response_json.get("done") + if not done: + url = sf.instance_url+response_json.get("nextRecordsUrl") + + report = [r for r in reports if report_name==r["DeveloperName"]][0] + report_id = report["Id"] + + endpoint = f"analytics/reports/{report_id}" + url = sf.data_url.format(sf.instance_url, endpoint) + response = sf._make_request('GET', url, headers=headers) + + with Transformer(pre_hook=transform_bulk_data_hook) as transformer: + rec = transformer.transform(response.json(), schema) + rec = fix_record_anytype(rec, schema) + stream = stream.replace("/","_") + singer.write_message( + singer.RecordMessage( + stream=( + stream_alias or stream), + record=rec, + version=stream_version, + time_extracted=start_time)) + + return stream,rec def fix_record_anytype(rec, schema): From 4bf5776acdab7ce862784156b6f375780a524e5a Mon Sep 17 00:00:00 2001 From: Renan Butkeraites Date: Tue, 1 Oct 2024 17:05:09 -0300 Subject: [PATCH 2/6] Includs multithread support for all calls and pk chunking --- tap_salesforce/__init__.py | 147 +++++++++++------------------ tap_salesforce/salesforce/bulk.py | 148 +++++++++++++++++------------- 2 files changed, 137 insertions(+), 158 deletions(-) diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index 940a6275..e0d0cd7e 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -12,7 +12,7 @@ from tap_salesforce.salesforce.exceptions import ( TapSalesforceException, TapSalesforceQuotaExceededException, TapSalesforceBulkAPIDisabledException) -import multiprocessing +from concurrent.futures import ThreadPoolExecutor, as_completed from functools import partial @@ -267,28 +267,25 @@ def do_discover(sf): sf_custom_setting_objects = [] object_to_tag_references = {} - # For each SF Object describe it, loop its fields and build a schema entries = [] # Check if the user has BULK API enabled if sf.api_type == 'BULK' and not Bulk(sf).has_permissions(): raise TapSalesforceBulkAPIDisabledException('This client does not have Bulk API permissions, received "API_DISABLED_FOR_ORG" error code') - for sobject_name in sorted(objects_to_discover): - - # Skip blacklisted SF objects depending on the api_type in use - # ChangeEvent objects are not queryable via Bulk or REST (undocumented) + # Function to describe an object and generate its schema + def describe_and_process(sobject_name): + # Skip blacklisted SF objects if (sobject_name in sf.get_blacklisted_objects() and sobject_name not in ACTIVITY_STREAMS) \ or sobject_name.endswith("ChangeEvent"): - continue + return None sobject_description = sf.describe(sobject_name) if sobject_description is None: - continue + return None - # Cache customSetting and Tag objects to check for blacklisting after - # all objects have been described + # Cache customSetting and Tag objects if sobject_description.get("customSetting"): sf_custom_setting_objects.append(sobject_name) elif sobject_name.endswith("__Tag"): @@ -296,108 +293,63 @@ def do_discover(sf): (f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"), None) if relationship_field: - # Map {"Object":"Object__Tag"} - object_to_tag_references[relationship_field["referenceTo"] - [0]] = sobject_name + object_to_tag_references[relationship_field["referenceTo"][0]] = sobject_name fields = sobject_description['fields'] replication_key = get_replication_key(sobject_name, fields) - # Salesforce Objects are skipped when they do not have an Id field - if not [f["name"] for f in fields if f["name"]=="Id"]: - LOGGER.info( - "Skipping Salesforce Object %s, as it has no Id field", - sobject_name) - continue + if not any(f["name"] == "Id" for f in fields): + LOGGER.info("Skipping Salesforce Object %s, as it has no Id field", sobject_name) + return None entry = generate_schema(fields, sf, sobject_name, replication_key) - entries.append(entry) - - # Handle ListViews - views = get_views_list(sf) + return entry - mdata = metadata.new() + # Using ThreadPoolExecutor to parallelize the describing of SF objects + with ThreadPoolExecutor() as executor: + future_to_object = {executor.submit(describe_and_process, sobject_name): sobject_name + for sobject_name in sorted(objects_to_discover)} - properties = {f"ListView_{o['SobjectType']}_{o['DeveloperName']}":dict(type=['null','object','string']) for o in views} + # Collect entries from completed futures + for future in as_completed(future_to_object): + entry = future.result() + if entry: + entries.append(entry) + # Handle ListViews + views = get_views_list(sf) + mdata = metadata.new() + properties = {f"ListView_{o['SobjectType']}_{o['DeveloperName']}": dict(type=['null', 'object', 'string']) for o in views} for name in properties.keys(): - mdata = metadata.write( - mdata,('properties',name),'selected-by-default',True - ) - - mdata = metadata.write( - mdata, - (), - 'forced-replication-method', - {'replication-method': 'FULL_TABLE'}) - + mdata = metadata.write(mdata, ('properties', name), 'selected-by-default', True) + mdata = metadata.write(mdata, (), 'forced-replication-method', {'replication-method': 'FULL_TABLE'}) mdata = metadata.write(mdata, (), 'table-key-properties', []) - - schema = { - 'type': 'object', - 'additionalProperties': False, - 'properties': properties - } - - entry = { - 'stream': "ListViews", - 'tap_stream_id': "ListViews", - 'schema': schema, - 'metadata': metadata.to_list(mdata) - } - - entries.append(entry) + schema = {'type': 'object', 'additionalProperties': False, 'properties': properties} + entries.append({'stream': "ListViews", 'tap_stream_id': "ListViews", 'schema': schema, 'metadata': metadata.to_list(mdata)}) # Handle Reports if sf.list_reports is True: reports = get_reports_list(sf) - mdata = metadata.new() properties = {} - if reports: for report in reports: field_name = f"Report_{report['DeveloperName']}" properties[field_name] = dict(type=["null", "object", "string"]) - mdata = metadata.write( - mdata, ('properties', field_name), 'selected-by-default', False) - - mdata = metadata.write( - mdata, - (), - 'forced-replication-method', - {'replication-method': 'FULL_TABLE'}) - + mdata = metadata.write(mdata, ('properties', field_name), 'selected-by-default', False) + mdata = metadata.write(mdata, (), 'forced-replication-method', {'replication-method': 'FULL_TABLE'}) mdata = metadata.write(mdata, (), 'table-key-properties', []) + schema = {'type': 'object', 'additionalProperties': False, 'properties': properties} + entries.append({'stream': "ReportList", 'tap_stream_id': "ReportList", 'schema': schema, 'metadata': metadata.to_list(mdata)}) - schema = { - 'type': 'object', - 'additionalProperties': False, - 'properties': properties - } - - entry = { - 'stream': "ReportList", - 'tap_stream_id': "ReportList", - 'schema': schema, - 'metadata': metadata.to_list(mdata) - } - - entries.append(entry) - - # For each custom setting field, remove its associated tag from entries - # See Blacklisting.md for more information - unsupported_tag_objects = [object_to_tag_references[f] - for f in sf_custom_setting_objects if f in object_to_tag_references] + # Remove unsupported tag objects + unsupported_tag_objects = [object_to_tag_references[f] for f in sf_custom_setting_objects if f in object_to_tag_references] if unsupported_tag_objects: - LOGGER.info( #pylint:disable=logging-not-lazy - "Skipping the following Tag objects, Tags on Custom Settings Salesforce objects " + - "are not supported by the Bulk API:") + LOGGER.info("Skipping the following Tag objects, Tags on Custom Settings Salesforce objects are not supported by the Bulk API:") LOGGER.info(unsupported_tag_objects) - entries = [e for e in entries if e['stream'] - not in unsupported_tag_objects] + entries = [e for e in entries if e['stream'] not in unsupported_tag_objects] - result = {'streams': entries} + result = {'streams': sorted(entries, key=lambda x: x['stream'])} json.dump(result, sys.stdout, indent=4) def do_sync(sf, catalog_entry, state, catalog,config=None): @@ -570,7 +522,7 @@ def main_impl(): catalog["streams"] = [c for c in catalog["streams"] if c["stream"] != "ListView"] catalog["streams"] = list_view + catalog["streams"] - # Create a dictionary with session details to pass to child processes + # Create a dictionary with session details to pass to threads sf_data = { 'access_token': sf.access_token, 'instance_url': sf.instance_url, @@ -588,16 +540,20 @@ def main_impl(): 'api_version': CONFIG.get('api_version'), } - # Use multiprocessing to process the catalog entries in parallel - with multiprocessing.Manager() as manager: - managed_state = manager.dict(args.state) # Shared state + # Use ThreadPoolExecutor to process the catalog entries in parallel using threads + with ThreadPoolExecutor() as executor: + # Partial function with shared session and config + process_func = partial(process_catalog_entry, sf_data=sf_data, state=args.state, catalog=catalog, config=CONFIG) - # Create a partial function with shared session and config - process_func = partial(process_catalog_entry, sf_data=sf_data, state=managed_state, catalog=catalog, config=CONFIG) + # Submit tasks to the executor for each stream + futures = [executor.submit(process_func, stream) for stream in catalog["streams"]] - # Parallel execution using multiprocessing.Pool - with multiprocessing.Pool(processes=8) as pool: - pool.map(process_func, catalog["streams"]) + # Optionally wait for all tasks to complete and handle exceptions + for future in futures: + try: + future.result() # This will raise any exceptions from the threads + except Exception as exc: + LOGGER.error(f"Error processing catalog entry: {exc}") if sf.rest_requests_attempted > 0: LOGGER.debug( @@ -607,6 +563,7 @@ def main_impl(): LOGGER.debug( "Replication used %s Bulk API jobs towards the Salesforce quota.", sf.jobs_completed) + def prepare_reports_streams(catalog): streams = catalog["streams"] diff --git a/tap_salesforce/salesforce/bulk.py b/tap_salesforce/salesforce/bulk.py index d8867cef..d36a2842 100644 --- a/tap_salesforce/salesforce/bulk.py +++ b/tap_salesforce/salesforce/bulk.py @@ -8,6 +8,7 @@ from singer import metrics import requests from requests.exceptions import RequestException +from concurrent.futures import ThreadPoolExecutor import xmltodict @@ -112,41 +113,37 @@ def _can_pk_chunk_job(self, failure_message): # pylint: disable=no-self-use "Failed to write query result" in failure_message def _bulk_query(self, catalog_entry, state): - job_id = self._create_job(catalog_entry) start_date = self.sf.get_start_date(state, catalog_entry) - - batch_id = self._add_batch(catalog_entry, job_id, start_date) - - self._close_job(job_id) - - batch_status = self._poll_on_batch_status(job_id, batch_id) - - if batch_status['state'] == 'Failed': - if self._can_pk_chunk_job(batch_status['stateMessage']): - batch_status = self._bulk_query_with_pk_chunking(catalog_entry, start_date) - job_id = batch_status['job_id'] - - # Set pk_chunking to True to indicate that we should write a bookmark differently - self.sf.pk_chunking = True - - # Add the bulk Job ID and its batches to the state so it can be resumed if necessary - tap_stream_id = catalog_entry['tap_stream_id'] - state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) - state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:]) - - for completed_batch_id in batch_status['completed']: - for result in self.get_batch_results(job_id, completed_batch_id, catalog_entry): - yield result - # Remove the completed batch ID and write state - state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"].remove(completed_batch_id) - LOGGER.info("Finished syncing batch %s. Removing batch from state.", completed_batch_id) - LOGGER.info("Batches to go: %d", len(state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"])) - singer.write_state(state) - else: - raise TapSalesforceException(batch_status['stateMessage']) - else: - for result in self.get_batch_results(job_id, batch_id, catalog_entry): - yield result + batch_status = self._bulk_query_with_pk_chunking(catalog_entry, start_date) + job_id = batch_status['job_id'] + self.sf.pk_chunking = True + # Write job ID and batch state for resumption + tap_stream_id = catalog_entry['tap_stream_id'] + state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) + state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:]) + + # Parallelize the batch result processing + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(self.process_batch, job_id, completed_batch_id, catalog_entry, state) + for completed_batch_id in batch_status['completed'] + ] + + # Process the results as they complete + for future in futures: + for result in future.result(): + yield result + + def process_batch(self, job_id, batch_id, catalog_entry, state): + """Process a single batch and yield results.""" + for result in self.get_batch_results(job_id, batch_id, catalog_entry): + yield result + + # Update state and log progress + state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"].remove(batch_id) + LOGGER.info("Finished syncing batch %s. Removing batch from state.", batch_id) + LOGGER.info("Batches to go: %d", len(state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"])) + singer.write_state(state) def _bulk_query_with_pk_chunking(self, catalog_entry, start_date): LOGGER.info("Retrying Bulk Query with PK Chunking") @@ -284,45 +281,70 @@ def _get_batch(self, job_id, batch_id): return batch['batchInfo'] def get_batch_results(self, job_id, batch_id, catalog_entry): - """Given a job_id and batch_id, queries the batches results and reads - CSV lines yielding each line as a record.""" + """Given a job_id and batch_id, queries the batch results and reads + CSV lines, yielding each line as a record.""" headers = self._get_bulk_headers() - endpoint = "job/{}/batch/{}/result".format(job_id, batch_id) + endpoint = f"job/{job_id}/batch/{batch_id}/result" url = self.bulk_url.format(self.sf.instance_url, endpoint) + # Timing the request with metrics.http_request_timer("batch_result_list") as timer: timer.tags['sobject'] = catalog_entry['stream'] batch_result_resp = self.sf._make_request('GET', url, headers=headers) - # Returns a Dict where input: - # 12 - # will return: {'result', ['1', '2']} - batch_result_list = xmltodict.parse(batch_result_resp.text, - xml_attribs=False, - force_list={'result'})['result-list'] + # Parse the result list from the XML response + batch_result_list = xmltodict.parse(batch_result_resp.text, xml_attribs=False, force_list={'result'})['result-list'] - for result in batch_result_list['result']: - endpoint = "job/{}/batch/{}/result/{}".format(job_id, batch_id, result) + # Function to fetch and process each result in parallel + def process_result(self, job_id, batch_id, result): + endpoint = f"job/{job_id}/batch/{batch_id}/result/{result}" url = self.bulk_url.format(self.sf.instance_url, endpoint) - headers['Content-Type'] = 'text/csv' - - with tempfile.NamedTemporaryFile(mode="w+", encoding="utf8") as csv_file: - resp = self.sf._make_request('GET', url, headers=headers, stream=True) - for chunk in resp.iter_content(chunk_size=ITER_CHUNK_SIZE, decode_unicode=True): - if chunk: - # Replace any NULL bytes in the chunk so it can be safely given to the CSV reader - csv_file.write(chunk.replace('\0', '')) - - csv_file.seek(0) - csv_reader = csv.reader(csv_file, - delimiter=',', - quotechar='"') - - column_name_list = next(csv_reader) - + headers = {'Content-Type': 'text/csv'} + + # Use a context manager for temporary file handling + with tempfile.NamedTemporaryFile(mode="w+", encoding="utf8", delete=False) as csv_file: + # Stream the CSV content from Salesforce Bulk API + try: + resp = self.sf._make_request('GET', url, headers=headers, stream=True) + resp.raise_for_status() # Ensure we handle errors from the request + + # Write chunks of CSV data to the temp file + for chunk in resp.iter_content(chunk_size=ITER_CHUNK_SIZE, decode_unicode=True): + if chunk: + csv_file.write(chunk.replace('\0', '')) # Replace NULL bytes + + csv_file.seek(0) # Move back to the start of the file after writing + + except requests.exceptions.RequestException as e: + # Handle any request errors (timeouts, connection errors, etc.) + raise TapSalesforceException(f"Error fetching results: {str(e)}") + + # Now process the CSV file + with open(csv_file.name, mode='r', encoding='utf8') as f: + csv_reader = csv.reader(f, delimiter=',', quotechar='"') + + try: + # Read column names from the first line + column_name_list = next(csv_reader) + except StopIteration: + # Handle case where no data is returned (empty CSV) + raise TapSalesforceException(f"No data found in batch {batch_id} result.") + + # Process each row in the CSV file for line in csv_reader: - rec = dict(zip(column_name_list, line)) - yield rec + record = dict(zip(column_name_list, line)) + yield record + + # Use ThreadPoolExecutor to parallelize the processing of results + with ThreadPoolExecutor() as executor: + # Submit tasks to the executor for parallel execution + futures = [executor.submit(process_result, result) for result in batch_result_list['result']] + + # Yield the results as they complete + for future in futures: + # `future.result()` is a generator, so yield each record from it + for record in future.result(): + yield record def _close_job(self, job_id): endpoint = "job/{}".format(job_id) From afd0dc6f66840da0b285530347c6c40b0940ee11 Mon Sep 17 00:00:00 2001 From: Renan Butkeraites Date: Wed, 2 Oct 2024 09:50:24 -0300 Subject: [PATCH 3/6] Handle when list_views is not selected on the config file --- tap_salesforce/__init__.py | 50 ++++++++++++------- tap_salesforce/salesforce/bulk.py | 82 +++++++++++++++---------------- 2 files changed, 72 insertions(+), 60 deletions(-) diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index e0d0cd7e..3ad471d2 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -232,33 +232,44 @@ def get_reports_list(sf): url = sf.instance_url+response_json.get("nextRecordsUrl") return output +def process_list_view(sf, lv): + sobject = lv['SobjectType'] + lv_id = lv['Id'] + try: + sf.listview(sobject, lv_id) + return lv + except RequestException as e: + LOGGER.info(f"No /'results/' endpoint found for Sobject: {sobject}, Id: {lv_id}") + return None + def get_views_list(sf): if not sf.list_views: return [] + headers = sf._get_standard_headers() endpoint = "queryAll" params = {'q': 'SELECT Id,Name,SobjectType,DeveloperName FROM ListView'} url = sf.data_url.format(sf.instance_url, endpoint) response = sf._make_request('GET', url, headers=headers, params=params) - + + list_views = response.json().get("records", []) responses = [] - for lv in response.json().get("records", []): - sobject = lv['SobjectType'] - lv_id = lv['Id'] - try: - sf.listview(sobject,lv_id) - responses.append(lv) - except RequestException as e: - LOGGER.info(f"No /'results/' endpoint found for Sobject: {sobject}, Id: {lv_id}") + with ThreadPoolExecutor() as executor: + futures = {executor.submit(process_list_view, sf, lv): lv for lv in list_views} + + for future in as_completed(futures): + result = future.result() + if result: + responses.append(result) return responses # pylint: disable=too-many-branches,too-many-statements -def do_discover(sf): +def do_discover(sf: Salesforce): """Describes a Salesforce instance's objects and generates a JSON schema for each field.""" global_description = sf.describe() @@ -317,15 +328,16 @@ def describe_and_process(sobject_name): entries.append(entry) # Handle ListViews - views = get_views_list(sf) - mdata = metadata.new() - properties = {f"ListView_{o['SobjectType']}_{o['DeveloperName']}": dict(type=['null', 'object', 'string']) for o in views} - for name in properties.keys(): - mdata = metadata.write(mdata, ('properties', name), 'selected-by-default', True) - mdata = metadata.write(mdata, (), 'forced-replication-method', {'replication-method': 'FULL_TABLE'}) - mdata = metadata.write(mdata, (), 'table-key-properties', []) - schema = {'type': 'object', 'additionalProperties': False, 'properties': properties} - entries.append({'stream': "ListViews", 'tap_stream_id': "ListViews", 'schema': schema, 'metadata': metadata.to_list(mdata)}) + if sf.list_views is True: + views = get_views_list(sf) + mdata = metadata.new() + properties = {f"ListView_{o['SobjectType']}_{o['DeveloperName']}": dict(type=['null', 'object', 'string']) for o in views} + for name in properties.keys(): + mdata = metadata.write(mdata, ('properties', name), 'selected-by-default', True) + mdata = metadata.write(mdata, (), 'forced-replication-method', {'replication-method': 'FULL_TABLE'}) + mdata = metadata.write(mdata, (), 'table-key-properties', []) + schema = {'type': 'object', 'additionalProperties': False, 'properties': properties} + entries.append({'stream': "ListViews", 'tap_stream_id': "ListViews", 'schema': schema, 'metadata': metadata.to_list(mdata)}) # Handle Reports if sf.list_reports is True: diff --git a/tap_salesforce/salesforce/bulk.py b/tap_salesforce/salesforce/bulk.py index d36a2842..072e4e4c 100644 --- a/tap_salesforce/salesforce/bulk.py +++ b/tap_salesforce/salesforce/bulk.py @@ -280,6 +280,46 @@ def _get_batch(self, job_id, batch_id): return batch['batchInfo'] + # Function to fetch and process each result in parallel + def process_result(self, job_id, batch_id, result): + endpoint = f"job/{job_id}/batch/{batch_id}/result/{result}" + url = self.bulk_url.format(self.sf.instance_url, endpoint) + headers = {'Content-Type': 'text/csv'} + + # Use a context manager for temporary file handling + with tempfile.NamedTemporaryFile(mode="w+", encoding="utf8", delete=False) as csv_file: + # Stream the CSV content from Salesforce Bulk API + try: + resp = self.sf._make_request('GET', url, headers=headers, stream=True) + resp.raise_for_status() # Ensure we handle errors from the request + + # Write chunks of CSV data to the temp file + for chunk in resp.iter_content(chunk_size=ITER_CHUNK_SIZE, decode_unicode=True): + if chunk: + csv_file.write(chunk.replace('\0', '')) # Replace NULL bytes + + csv_file.seek(0) # Move back to the start of the file after writing + + except requests.exceptions.RequestException as e: + # Handle any request errors (timeouts, connection errors, etc.) + raise TapSalesforceException(f"Error fetching results: {str(e)}") + + # Now process the CSV file + with open(csv_file.name, mode='r', encoding='utf8') as f: + csv_reader = csv.reader(f, delimiter=',', quotechar='"') + + try: + # Read column names from the first line + column_name_list = next(csv_reader) + except StopIteration: + # Handle case where no data is returned (empty CSV) + raise TapSalesforceException(f"No data found in batch {batch_id} result.") + + # Process each row in the CSV file + for line in csv_reader: + record = dict(zip(column_name_list, line)) + yield record + def get_batch_results(self, job_id, batch_id, catalog_entry): """Given a job_id and batch_id, queries the batch results and reads CSV lines, yielding each line as a record.""" @@ -295,50 +335,10 @@ def get_batch_results(self, job_id, batch_id, catalog_entry): # Parse the result list from the XML response batch_result_list = xmltodict.parse(batch_result_resp.text, xml_attribs=False, force_list={'result'})['result-list'] - # Function to fetch and process each result in parallel - def process_result(self, job_id, batch_id, result): - endpoint = f"job/{job_id}/batch/{batch_id}/result/{result}" - url = self.bulk_url.format(self.sf.instance_url, endpoint) - headers = {'Content-Type': 'text/csv'} - - # Use a context manager for temporary file handling - with tempfile.NamedTemporaryFile(mode="w+", encoding="utf8", delete=False) as csv_file: - # Stream the CSV content from Salesforce Bulk API - try: - resp = self.sf._make_request('GET', url, headers=headers, stream=True) - resp.raise_for_status() # Ensure we handle errors from the request - - # Write chunks of CSV data to the temp file - for chunk in resp.iter_content(chunk_size=ITER_CHUNK_SIZE, decode_unicode=True): - if chunk: - csv_file.write(chunk.replace('\0', '')) # Replace NULL bytes - - csv_file.seek(0) # Move back to the start of the file after writing - - except requests.exceptions.RequestException as e: - # Handle any request errors (timeouts, connection errors, etc.) - raise TapSalesforceException(f"Error fetching results: {str(e)}") - - # Now process the CSV file - with open(csv_file.name, mode='r', encoding='utf8') as f: - csv_reader = csv.reader(f, delimiter=',', quotechar='"') - - try: - # Read column names from the first line - column_name_list = next(csv_reader) - except StopIteration: - # Handle case where no data is returned (empty CSV) - raise TapSalesforceException(f"No data found in batch {batch_id} result.") - - # Process each row in the CSV file - for line in csv_reader: - record = dict(zip(column_name_list, line)) - yield record - # Use ThreadPoolExecutor to parallelize the processing of results with ThreadPoolExecutor() as executor: # Submit tasks to the executor for parallel execution - futures = [executor.submit(process_result, result) for result in batch_result_list['result']] + futures = [executor.submit(self.process_result, job_id, batch_id, result) for result in batch_result_list['result']] # Yield the results as they complete for future in futures: From ab6a26b0bbeff82dd5dc82cc873b168403b40001 Mon Sep 17 00:00:00 2001 From: Renan Butkeraites Date: Thu, 3 Oct 2024 16:17:16 -0300 Subject: [PATCH 4/6] Adjust pk chunking with a fall back to rest request --- tap_salesforce/__init__.py | 5 +- tap_salesforce/salesforce/__init__.py | 21 +++- tap_salesforce/salesforce/bulk.py | 158 ++++++++++++++------------ tap_salesforce/sync.py | 3 - 4 files changed, 106 insertions(+), 81 deletions(-) diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index 3ad471d2..8548cb8a 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -13,7 +13,6 @@ TapSalesforceException, TapSalesforceQuotaExceededException, TapSalesforceBulkAPIDisabledException) from concurrent.futures import ThreadPoolExecutor, as_completed -from functools import partial LOGGER = singer.get_logger() @@ -555,10 +554,10 @@ def main_impl(): # Use ThreadPoolExecutor to process the catalog entries in parallel using threads with ThreadPoolExecutor() as executor: # Partial function with shared session and config - process_func = partial(process_catalog_entry, sf_data=sf_data, state=args.state, catalog=catalog, config=CONFIG) + state = args.state # Submit tasks to the executor for each stream - futures = [executor.submit(process_func, stream) for stream in catalog["streams"]] + futures = [executor.submit(process_catalog_entry, stream, sf_data, state, catalog, CONFIG) for stream in catalog["streams"]] # Optionally wait for all tasks to complete and handle exceptions for future in futures: diff --git a/tap_salesforce/salesforce/__init__.py b/tap_salesforce/salesforce/__init__.py index 186a9209..8817ece6 100644 --- a/tap_salesforce/salesforce/__init__.py +++ b/tap_salesforce/salesforce/__init__.py @@ -289,13 +289,18 @@ def _make_request(self, http_method, url, headers=None, body=None, stream=False, elif http_method == "POST": LOGGER.info("Making %s request to %s with body %s", http_method, url, body) resp = self.session.post(url, headers=headers, data=body) + LOGGER.info("Completed %s request to %s with body: %s", http_method, url, body) else: raise TapSalesforceException("Unsupported HTTP method") try: resp.raise_for_status() except RequestException as ex: - raise ex + try: + if "is not supported to use PKChunking" not in ex.response.json()['exceptionMessage']: + raise ex + except: + raise ex if resp.headers.get('Sforce-Limit-Info') is not None: self.rest_requests_attempted += 1 @@ -435,10 +440,16 @@ def query(self, catalog_entry, state, query_override=None): if state["bookmarks"].get("ListView"): if state["bookmarks"]["ListView"].get("SystemModstamp"): del state["bookmarks"]["ListView"]["SystemModstamp"] - if self.api_type == BULK_API_TYPE and query_override is None: - bulk = Bulk(self) - return bulk.query(catalog_entry, state) - elif self.api_type == REST_API_TYPE or query_override is not None: + try_rest_call = False + try: + if self.api_type == BULK_API_TYPE and query_override is None: + bulk = Bulk(self) + return bulk.query(catalog_entry, state) + except Exception as e: + LOGGER.warning(f"[FAILURE] BULK API failed for catalog entry {catalog_entry} and state {state}. Trying a rest call.") + LOGGER.info(e) + try_rest_call = True + if try_rest_call or self.api_type == REST_API_TYPE or query_override is not None: rest = Rest(self) return rest.query(catalog_entry, state, query_override=query_override) else: diff --git a/tap_salesforce/salesforce/bulk.py b/tap_salesforce/salesforce/bulk.py index 072e4e4c..4ae50fa8 100644 --- a/tap_salesforce/salesforce/bulk.py +++ b/tap_salesforce/salesforce/bulk.py @@ -49,6 +49,7 @@ def __init__(self, sf): # Set csv max reading size to the platform's max size available. csv.field_size_limit(sys.maxsize) self.sf = sf + self.closed_jobs = [] def has_permissions(self): try: @@ -113,25 +114,47 @@ def _can_pk_chunk_job(self, failure_message): # pylint: disable=no-self-use "Failed to write query result" in failure_message def _bulk_query(self, catalog_entry, state): - start_date = self.sf.get_start_date(state, catalog_entry) - batch_status = self._bulk_query_with_pk_chunking(catalog_entry, start_date) - job_id = batch_status['job_id'] - self.sf.pk_chunking = True - # Write job ID and batch state for resumption - tap_stream_id = catalog_entry['tap_stream_id'] - state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) - state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:]) - - # Parallelize the batch result processing - with ThreadPoolExecutor() as executor: - futures = [ - executor.submit(self.process_batch, job_id, completed_batch_id, catalog_entry, state) - for completed_batch_id in batch_status['completed'] - ] - - # Process the results as they complete - for future in futures: - for result in future.result(): + try: + start_date = self.sf.get_start_date(state, catalog_entry) + batch_status = self._bulk_query_with_pk_chunking(catalog_entry, start_date) + job_id = batch_status['job_id'] + self.sf.pk_chunking = True + # Write job ID and batch state for resumption + tap_stream_id = catalog_entry['tap_stream_id'] + state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) + state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:]) + + # Parallelize the batch result processing + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(self.process_batch, job_id, completed_batch_id, catalog_entry, state) + for completed_batch_id in batch_status['completed'] + ] + + # Process the results as they complete + for future in futures: + for result in future.result(): + yield result + except Exception as e: + if job_id in self.closed_jobs: + LOGGER.info(f"Another batch failed before. Ignoring this new job...") + pass + LOGGER.info(f"PK Chunking failled on job {job_id}. Trying without it...") + self._close_job(job_id) + + job_id = self._create_job(catalog_entry) + start_date = self.sf.get_start_date(state, catalog_entry) + self.sf.pk_chunking = False + + batch_id = self._add_batch(catalog_entry, job_id, start_date) + + self._close_job(job_id) + + batch_status = self._poll_on_batch_status(job_id, batch_id) + if batch_status['state'] == 'Failed': + raise TapSalesforceException(batch_status['stateMessage']) + else: + for result in self.get_batch_results(job_id, batch_id, catalog_entry): yield result def process_batch(self, job_id, batch_id, catalog_entry, state): @@ -146,7 +169,7 @@ def process_batch(self, job_id, batch_id, catalog_entry, state): singer.write_state(state) def _bulk_query_with_pk_chunking(self, catalog_entry, start_date): - LOGGER.info("Retrying Bulk Query with PK Chunking") + LOGGER.info("Trying Bulk Query with PK Chunking") # Create a new job job_id = self._create_job(catalog_entry, True) @@ -220,6 +243,8 @@ def _poll_on_pk_chunked_batch_status(self, job_id): if not queued_batches and not in_progress_batches: completed_batches = [b['id'] for b in batches if b['state'] == "Completed"] failed_batches = [b['id'] for b in batches if b['state'] == "Failed"] + if len(failed_batches) > 0: + LOGGER.error(f"{[{b['id']:b.get('stateMessage')} for b in batches if b['state'] == 'Failed']}") return {'completed': completed_batches, 'failed': failed_batches} else: time.sleep(PK_CHUNKED_BATCH_STATUS_POLLING_SLEEP) @@ -230,6 +255,7 @@ def _poll_on_batch_status(self, job_id, batch_id): batch_id=batch_id) while batch_status['state'] not in ['Completed', 'Failed', 'Not Processed']: + LOGGER.info(f'job_id: {job_id}, batch_id: {batch_id} - batch_status["state"]: {batch_status["state"]} - Sleeping for {BATCH_STATUS_POLLING_SLEEP} seconds...') time.sleep(BATCH_STATUS_POLLING_SLEEP) batch_status = self._get_batch(job_id=job_id, batch_id=batch_id) @@ -280,73 +306,65 @@ def _get_batch(self, job_id, batch_id): return batch['batchInfo'] - # Function to fetch and process each result in parallel - def process_result(self, job_id, batch_id, result): - endpoint = f"job/{job_id}/batch/{batch_id}/result/{result}" - url = self.bulk_url.format(self.sf.instance_url, endpoint) - headers = {'Content-Type': 'text/csv'} - - # Use a context manager for temporary file handling - with tempfile.NamedTemporaryFile(mode="w+", encoding="utf8", delete=False) as csv_file: - # Stream the CSV content from Salesforce Bulk API - try: - resp = self.sf._make_request('GET', url, headers=headers, stream=True) - resp.raise_for_status() # Ensure we handle errors from the request - - # Write chunks of CSV data to the temp file - for chunk in resp.iter_content(chunk_size=ITER_CHUNK_SIZE, decode_unicode=True): - if chunk: - csv_file.write(chunk.replace('\0', '')) # Replace NULL bytes - - csv_file.seek(0) # Move back to the start of the file after writing - - except requests.exceptions.RequestException as e: - # Handle any request errors (timeouts, connection errors, etc.) - raise TapSalesforceException(f"Error fetching results: {str(e)}") - - # Now process the CSV file - with open(csv_file.name, mode='r', encoding='utf8') as f: - csv_reader = csv.reader(f, delimiter=',', quotechar='"') - - try: - # Read column names from the first line - column_name_list = next(csv_reader) - except StopIteration: - # Handle case where no data is returned (empty CSV) - raise TapSalesforceException(f"No data found in batch {batch_id} result.") - - # Process each row in the CSV file - for line in csv_reader: - record = dict(zip(column_name_list, line)) - yield record - def get_batch_results(self, job_id, batch_id, catalog_entry): """Given a job_id and batch_id, queries the batch results and reads CSV lines, yielding each line as a record.""" headers = self._get_bulk_headers() endpoint = f"job/{job_id}/batch/{batch_id}/result" - url = self.bulk_url.format(self.sf.instance_url, endpoint) + batch_url = self.bulk_url.format(self.sf.instance_url, endpoint) # Timing the request with metrics.http_request_timer("batch_result_list") as timer: timer.tags['sobject'] = catalog_entry['stream'] - batch_result_resp = self.sf._make_request('GET', url, headers=headers) + batch_result_resp = self.sf._make_request('GET', batch_url, headers=headers) # Parse the result list from the XML response batch_result_list = xmltodict.parse(batch_result_resp.text, xml_attribs=False, force_list={'result'})['result-list'] # Use ThreadPoolExecutor to parallelize the processing of results - with ThreadPoolExecutor() as executor: - # Submit tasks to the executor for parallel execution - futures = [executor.submit(self.process_result, job_id, batch_id, result) for result in batch_result_list['result']] - - # Yield the results as they complete - for future in futures: - # `future.result()` is a generator, so yield each record from it - for record in future.result(): + for result in batch_result_list['result']: + url = batch_url + f"/{result}" + headers['Content-Type'] = 'text/csv' + + # Use a context manager for temporary file handling + with tempfile.NamedTemporaryFile(mode="w+", encoding="utf8", delete=False) as csv_file: + # Stream the CSV content from Salesforce Bulk API + try: + resp = self.sf._make_request('GET', url, headers=headers, stream=True) + resp.raise_for_status() # Ensure we handle errors from the request + + # Write chunks of CSV data to the temp file + for chunk in resp.iter_content(chunk_size=ITER_CHUNK_SIZE, decode_unicode=True): + if chunk: + csv_file.write(chunk.replace('\0', '')) # Replace NULL bytes + + csv_file.seek(0) # Move back to the start of the file after writing + + except requests.exceptions.RequestException as e: + # Handle any request errors (timeouts, connection errors, etc.) + raise TapSalesforceException(f"Error fetching results: {str(e)}") + + # Now process the CSV file + with open(csv_file.name, mode='r', encoding='utf8') as f: + csv_reader = csv.reader(f, delimiter=',', quotechar='"') + + try: + # Read column names from the first line + column_name_list = next(csv_reader) + except StopIteration: + # Handle case where no data is returned (empty CSV) + raise TapSalesforceException(f"No data found in batch {batch_id} result.") + + # Process each row in the CSV file + for line in csv_reader: + record = dict(zip(column_name_list, line)) yield record def _close_job(self, job_id): + if job_id in self.closed_jobs: + LOGGER.info(f"Job {job_id} already closed. Skipping the request") + return + self.closed_jobs.append(job_id) endpoint = "job/{}".format(job_id) url = self.bulk_url.format(self.sf.instance_url, endpoint) body = {"state": "Closed"} diff --git a/tap_salesforce/sync.py b/tap_salesforce/sync.py index 92f9586d..16640f6d 100644 --- a/tap_salesforce/sync.py +++ b/tap_salesforce/sync.py @@ -222,7 +222,6 @@ def sync_records(sf, catalog_entry, state, input_state, counter, catalog,config= start_time = singer_utils.now() LOGGER.info('Syncing Salesforce data for stream %s', stream) - records_post = [] if "/" in state["current_stream"]: # get current name @@ -367,8 +366,6 @@ def sync_list_views_stream(sf, catalog_entry, state, input_state, catalog, repli if selected_list==f"ListView_{isob['SobjectType']}_{isob['DeveloperName']}": selected_lists_names.append(isob) - replication_key_value = replication_key and singer_utils.strptime_with_tz(rec[replication_key]) - for list_info in selected_lists_names: sobject = list_info['SobjectType'] lv_name = list_info['DeveloperName'] From 6e2507004de38c0f058b415d7a1c5fc05cd57b23 Mon Sep 17 00:00:00 2001 From: Renan Butkeraites Date: Mon, 7 Oct 2024 17:03:17 +0200 Subject: [PATCH 5/6] Add another batch key chunk try with a smaller chunk size --- tap_salesforce/salesforce/bulk.py | 105 +++++++++++++++++------------- tap_salesforce/sync.py | 5 +- 2 files changed, 60 insertions(+), 50 deletions(-) diff --git a/tap_salesforce/salesforce/bulk.py b/tap_salesforce/salesforce/bulk.py index 4ae50fa8..34c4adc9 100644 --- a/tap_salesforce/salesforce/bulk.py +++ b/tap_salesforce/salesforce/bulk.py @@ -18,7 +18,8 @@ BATCH_STATUS_POLLING_SLEEP = 20 PK_CHUNKED_BATCH_STATUS_POLLING_SLEEP = 60 ITER_CHUNK_SIZE = 1024 -DEFAULT_CHUNK_SIZE = 100000 # Max is 250000 +DEFAULT_CHUNK_SIZE = 250000 # Max is 250000 +FALLBACK_CHUNK_SIZE = 2000 LOGGER = singer.get_logger() @@ -113,49 +114,60 @@ def _can_pk_chunk_job(self, failure_message): # pylint: disable=no-self-use "Retried more than 15 times" in failure_message or \ "Failed to write query result" in failure_message + def try_bulking_with_pk_chunking(self, catalog_entry, state, use_fall_back_chunk_size=False): + start_date = self.sf.get_start_date(state, catalog_entry) + batch_status = self._bulk_query_with_pk_chunking(catalog_entry, start_date, use_fall_back_chunk_size) + job_id = batch_status['job_id'] + self.sf.pk_chunking = True + # Write job ID and batch state for resumption + tap_stream_id = catalog_entry['tap_stream_id'] + self.tap_stream_id = tap_stream_id + state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) + state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:]) + + # Parallelize the batch result processing + with ThreadPoolExecutor() as executor: + futures = [ + executor.submit(self.process_batch, job_id, completed_batch_id, catalog_entry, state) + for completed_batch_id in batch_status['completed'] + ] + + # Process the results as they complete + for future in futures: + for result in future.result(): + yield result + def _bulk_query(self, catalog_entry, state): try: - start_date = self.sf.get_start_date(state, catalog_entry) - batch_status = self._bulk_query_with_pk_chunking(catalog_entry, start_date) - job_id = batch_status['job_id'] - self.sf.pk_chunking = True - # Write job ID and batch state for resumption - tap_stream_id = catalog_entry['tap_stream_id'] - state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) - state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:]) - - # Parallelize the batch result processing - with ThreadPoolExecutor() as executor: - futures = [ - executor.submit(self.process_batch, job_id, completed_batch_id, catalog_entry, state) - for completed_batch_id in batch_status['completed'] - ] - - # Process the results as they complete - for future in futures: - for result in future.result(): - yield result + yield from self.try_bulking_with_pk_chunking(catalog_entry, state) except Exception as e: - if job_id in self.closed_jobs: - LOGGER.info(f"Another batch failed before. Ignoring this new job...") - pass - LOGGER.info(f"PK Chunking failled on job {job_id}. Trying without it...") - self._close_job(job_id) - - job_id = self._create_job(catalog_entry) - start_date = self.sf.get_start_date(state, catalog_entry) - self.sf.pk_chunking = False - - batch_id = self._add_batch(catalog_entry, job_id, start_date) - - self._close_job(job_id) - - batch_status = self._poll_on_batch_status(job_id, batch_id) - if batch_status['state'] == 'Failed': - raise TapSalesforceException(batch_status['stateMessage']) - else: - for result in self.get_batch_results(job_id, batch_id, catalog_entry): - yield result + try: + yield from self.try_bulking_with_pk_chunking(catalog_entry, state, True) + except Exception as e: + if job_id in self.closed_jobs: + LOGGER.info(f"Another batch failed before. Ignoring this new job...") + pass + LOGGER.info(f"PK Chunking failled on job {job_id}. Trying without it...") + self._close_job(job_id) + + if hasattr(self,"tap_stream_id"): + with open("streams_pk_chunking_failing.txt", "a") as file: + file.write(self.tap_stream_id + "\n") # Append data with a newline character + + job_id = self._create_job(catalog_entry) + start_date = self.sf.get_start_date(state, catalog_entry) + self.sf.pk_chunking = False + + batch_id = self._add_batch(catalog_entry, job_id, start_date) + + self._close_job(job_id) + + batch_status = self._poll_on_batch_status(job_id, batch_id) + if batch_status['state'] == 'Failed': + raise TapSalesforceException(batch_status['stateMessage']) + else: + for result in self.get_batch_results(job_id, batch_id, catalog_entry): + yield result def process_batch(self, job_id, batch_id, catalog_entry, state): """Process a single batch and yield results.""" @@ -168,11 +180,11 @@ def process_batch(self, job_id, batch_id, catalog_entry, state): LOGGER.info("Batches to go: %d", len(state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"])) singer.write_state(state) - def _bulk_query_with_pk_chunking(self, catalog_entry, start_date): + def _bulk_query_with_pk_chunking(self, catalog_entry, start_date, use_fall_back_chunk_size=False): LOGGER.info("Trying Bulk Query with PK Chunking") # Create a new job - job_id = self._create_job(catalog_entry, True) + job_id = self._create_job(catalog_entry, True, use_fall_back_chunk_size) self._add_batch(catalog_entry, job_id, start_date, False) @@ -187,7 +199,7 @@ def _bulk_query_with_pk_chunking(self, catalog_entry, start_date): return batch_status - def _create_job(self, catalog_entry, pk_chunking=False): + def _create_job(self, catalog_entry, pk_chunking=False, use_fall_back_chunk_size=False): url = self.bulk_url.format(self.sf.instance_url, "job") body = {"operation": "queryAll", "object": catalog_entry['stream'], "contentType": "CSV"} @@ -196,8 +208,9 @@ def _create_job(self, catalog_entry, pk_chunking=False): if pk_chunking: LOGGER.info("ADDING PK CHUNKING HEADER") - - headers['Sforce-Enable-PKChunking'] = "true; chunkSize={}".format(DEFAULT_CHUNK_SIZE) + chunk_size = DEFAULT_CHUNK_SIZE if not use_fall_back_chunk_size else FALLBACK_CHUNK_SIZE + headers['Sforce-Enable-PKChunking'] = "true; chunkSize={}".format(chunk_size) + LOGGER.info(f"[use_fall_back_chunk_size:{use_fall_back_chunk_size}] HEADERS: {headers}") # If the stream ends with 'CleanInfo' or 'History', we can PK Chunk on the object's parent if any(catalog_entry['stream'].endswith(suffix) for suffix in ["CleanInfo", "History"]): diff --git a/tap_salesforce/sync.py b/tap_salesforce/sync.py index 16640f6d..eac03d9f 100644 --- a/tap_salesforce/sync.py +++ b/tap_salesforce/sync.py @@ -204,10 +204,7 @@ def handle_ListView(sf,rec_id,sobject,lv_name,lv_catalog_entry,state,input_state time_extracted=start_time)) def sync_records(sf, catalog_entry, state, input_state, counter, catalog,config=None): - download_files = False - if "download_files" in config: - if config['download_files']==True: - download_files = True + download_files = config.get("download_files", False) chunked_bookmark = singer_utils.strptime_with_tz(sf.get_start_date(state, catalog_entry)) stream = catalog_entry['stream'] schema = catalog_entry['schema'] From 1479a73e7708fb9260b5905107eaab7224d40096 Mon Sep 17 00:00:00 2001 From: Renan Butkeraites Date: Mon, 7 Oct 2024 19:01:39 +0200 Subject: [PATCH 6/6] Get rid of the blacklisted_objects concept --- tap_salesforce/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index 8548cb8a..9a5e360f 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -286,8 +286,7 @@ def do_discover(sf: Salesforce): # Function to describe an object and generate its schema def describe_and_process(sobject_name): # Skip blacklisted SF objects - if (sobject_name in sf.get_blacklisted_objects() and sobject_name not in ACTIVITY_STREAMS) \ - or sobject_name.endswith("ChangeEvent"): + if sobject_name.endswith("ChangeEvent"): return None sobject_description = sf.describe(sobject_name)