diff --git a/target_salesforce_v3/client.py b/target_salesforce_v3/client.py index c9a8ef7..270286d 100644 --- a/target_salesforce_v3/client.py +++ b/target_salesforce_v3/client.py @@ -98,6 +98,7 @@ def check_salesforce_limits(self, response): percent_used_from_total = (remaining / allotted) * 100 if percent_used_from_total > quota_percent_total: + self._target.hit_rate_limit = True total_message = ( "Salesforce has reported {}/{} ({:3.2f}%) total REST quota " "used across all Salesforce Applications. Terminating " @@ -233,28 +234,30 @@ def sf_fields(self, object_type=None): sobject = self.request_api("GET", f"sobjects/{object_type}/describe/") return [f for f in sobject.json()["fields"]] - def sf_fields_description(self, object_type=None): - fld = self.sf_fields(object_type=object_type) + def sf_fields_description(self, object_type=None, object_fields=None): + if not object_fields: + object_fields = self.sf_fields(object_type=object_type) + fields = {} fields["createable"] = [ - f["name"] for f in fld if f["createable"] and not f["custom"] + f["name"] for f in object_fields if f["createable"] and not f["custom"] ] fields["custom"] = [ - f["name"] for f in fld if f["custom"] + f["name"] for f in object_fields if f["custom"] ] fields["createable_not_default"] = [ f["name"] - for f in fld + for f in object_fields if f["createable"] and not f["defaultedOnCreate"] and not f["custom"] ] fields["required"] = [ f["name"] - for f in fld + for f in object_fields if not f["nillable"] and f["createable"] and not f["defaultedOnCreate"] ] - fields["external_ids"] = [f["name"] for f in fld if f["externalId"]] + fields["external_ids"] = [f["name"] for f in object_fields if f["externalId"]] fields["pickable"] = {} - for field in fld: + for field in object_fields: if field["picklistValues"]: fields["pickable"][field["name"]] = [ p["label"] for p in field["picklistValues"] if p["active"] diff --git a/target_salesforce_v3/sinks.py b/target_salesforce_v3/sinks.py index b6d1743..579e715 100644 --- a/target_salesforce_v3/sinks.py +++ b/target_salesforce_v3/sinks.py @@ -11,6 +11,7 @@ from dateutil.parser import parse from datetime import datetime from singer_sdk.exceptions import FatalAPIError, RetriableAPIError +from target_salesforce_v3.client import TargetSalesforceQuotaExceededException class MissingObjectInSalesforceError(Exception): @@ -716,151 +717,147 @@ def lookup_fields_dict(self): def name(self): return self.stream_name - def get_fields_for_object(self, object_type): + def get_fields_for_object(self, object_type, objects_list): """Check if Salesforce has an object type and fetches its fields.""" - req = self.request_api("GET") - for object in req.json().get("sobjects", []): + for object in objects_list: if object["name"] == object_type or object["label"] == object_type or object["labelPlural"] == object_type: obj_req = self.request_api("GET", endpoint=f"sobjects/{object['name']}/describe").json() return {f["name"]: f for f in obj_req.get("fields", [])} - raise MissingObjectInSalesforceError(f"Object type {object_type} not found in Salesforce.") def preprocess_record(self, record, context): - # Check if object exists in Salesforce - object_type = None - req = self.request_api("GET", "sobjects") - objects_list = req.json().get("sobjects", []) - for object in objects_list: - is_name = object["name"] == self.stream_name - is_label = object["label"] == self.stream_name - is_label_plural = object["labelPlural"] == self.stream_name - if is_name or is_label or is_label_plural: - self.logger.info(f"Processing record for type {self.stream_name}. Using fallback sink.") - object_type = object["name"] - break - - if not object_type: - self.logger.info(f"Record doesn't exist on Salesforce {self.stream_name} was not found on Salesforce.") - return {} - - try: - fields = self.get_fields_for_object(object_type) - except MissingObjectInSalesforceError: - self.logger.info("Skipping record, because it was not found on Salesforce.") - return {} - record["object_type"] = object_type - - # If lookup_fields dict exist in config use it to check if the record exists in Salesforce - object_lookup_field = self.lookup_fields_dict.get(object_type) - # check if the lookup field exists for the object - object_lookup_field = object_lookup_field if object_lookup_field in fields else None - # check if the record has a value for the lookup field - lookup_value = record.get(object_lookup_field) - - req = None - # lookup for record with field from config - if object_lookup_field and lookup_value: - query_fields = ",".join([field for field in fields.keys() if field in record] + ["Id"]) - query = f"SELECT {query_fields} FROM {object_type} WHERE {object_lookup_field} = '{lookup_value}'" - req = self.request_api("GET", "queryAll", params={"q": query}) - req = req.json().get("records") - # lookup for record with email fields - else: - # Try to find object instance using email - email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"] - email_values = [record.get(email_field) for email_field in email_fields if record.get(email_field)] - - for email_to_check in email_values: - # Escape special characters on email - for char in ["+", "-"]: - if char in email_to_check: - email_to_check = email_to_check.replace(char, f"\{char}") - - query = "".join(["FIND {", email_to_check, "} ", f" IN ALL FIELDS RETURNING {object_type}(id)"]) - req = self.request_api("GET", "search/", params={"q": query}) - req = req.json().get("searchRecords") + # not process records if target hit API rate limits + if not self._target.hit_rate_limit: + try: + # Check if object exists in Salesforce + object_type = None + + # get list of objects + if not self._target.sobjects: + req = self.request_api("GET", "sobjects") + self._target.sobjects = req.json().get("sobjects", []) + objects_list = self._target.sobjects + + # find sobject for record + for object in objects_list: + is_name = object["name"] == self.stream_name + is_label = object["label"] == self.stream_name + is_label_plural = object["labelPlural"] == self.stream_name + if is_name or is_label or is_label_plural: + self.logger.info(f"Processing record for type {self.stream_name}. Using fallback sink.") + object_type = object["name"] + break + + if not object_type: + return {"error": f"Object not found for {self.name} in Salesforce"} + + # get record fields + try: + self._target.current_sink_name = self.name + if not self._target.current_fields or self.name != self._target.current_sink_name: + self._target.current_fields = self.get_fields_for_object(object_type, objects_list) + fields = self._target.current_fields + except MissingObjectInSalesforceError: + self.logger.info("Skipping record, because it was not found on Salesforce.") + return {} + record["object_type"] = object_type + + # If lookup_fields dict exist in config use it to check if the record exists in Salesforce + object_lookup_field = self.lookup_fields_dict.get(object_type) + # check if the lookup field exists for the object + object_lookup_field = object_lookup_field if object_lookup_field in fields else None + # check if the record has a value for the lookup field + lookup_value = record.get(object_lookup_field) + + req = None + # lookup for record with field from config + if object_lookup_field and lookup_value: + query_fields = ",".join([field for field in fields.keys() if field in record] + ["Id"]) + query = f"SELECT {query_fields} FROM {object_type} WHERE {object_lookup_field} = '{lookup_value}'" + req = self.request_api("GET", "queryAll", params={"q": query}) + req = req.json().get("records") + # lookup for record with email fields + else: + # Try to find object instance using email + email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"] + email_values = [record.get(email_field) for email_field in email_fields if record.get(email_field)] + + for email_to_check in email_values: + # Escape special characters on email + for char in ["+", "-"]: + if char in email_to_check: + email_to_check = email_to_check.replace(char, f"\{char}") + + query = "".join(["FIND {", email_to_check, "} ", f" IN ALL FIELDS RETURNING {object_type}(id)"]) + req = self.request_api("GET", "search/", params={"q": query}) + req = req.json().get("searchRecords") + if req: + break + + # if record already exists add its Id for patching if req: - break + existing_record = req[0] + # if flag only_upsert_empty_fields is true, only send fields with currently empty values + if self.config.get("only_upsert_empty_fields"): + record = {k:v for k,v in record.items() if not existing_record.get(k)} + record["Id"] = existing_record["Id"] + return record + except TargetSalesforceQuotaExceededException as e: + return {"error": str(e)} + else: + return {"error": "Unprocessed record due to requests exceeded API rate limits"} - # if record already exists add its Id for patching - if req: - existing_record = req[0] - # if flag only_upsert_empty_fields is true, only send fields with currently empty values - if self.config.get("only_upsert_empty_fields"): - record = {k:v for k,v in record.items() if not existing_record.get(k)} - record["Id"] = existing_record["Id"] - return record def upsert_record(self, record, context): - if record == {} or record is None: - return None, False, {} - - state_updates = dict() - - object_type = record.pop("object_type", None) - self.logger.info(f"Processing record for type {self.stream_name}. Using fallback sink.") - - if record == {}: - self.logger.info(f"Processing record for type {self.stream_name} failed. Check logs.") - return - - fields_desc = self.sf_fields_description(object_type=object_type) - - possible_update_fields = [] + # Not process records if target hit API rate limits + if record.get("error"): + return None, False, record + if record: + state_updates = dict() - for field in fields_desc["external_ids"]: - if field in record: - possible_update_fields.append(field) + # build the right endpoint + object_type = record.pop("object_type", None) + endpoint = f"sobjects/{object_type}" + self.logger.info(f"Processing record for type {self.stream_name}. Using fallback sink.") - if record.get("Id"): - fields = ["Id"] - else: - list_fields = [field_list for field_list in fields_desc.values()] - fields = [] - for list_field in list_fields: - for item in list_field: - fields.append(item) + # check if all payload fields exist in salesforce + fields = self._target.current_fields + fields_desc = self.sf_fields_description(object_type, fields.values()) - endpoint = f"sobjects/{object_type}" + possible_update_fields = [field for field in fields_desc["external_ids"] if field in record] - for field in record.keys(): - if field not in fields: - self.logger.info(f"Field {field} doesn't exist on Salesforce.") + for field in record.keys(): + if field not in fields.keys(): + self.logger.info(f"Field {field} doesn't exist on Salesforce.") + missing_fields = list(set(fields) - set(record.keys())) - missing_fields = list(set(fields) - set(record.keys())) + if len(missing_fields) > 0.5 * len(fields): + self.logger.info(f"This record may require more fields to be mapped. Missing fields: {missing_fields}") - if len(missing_fields) > 0.5 * len(fields): - self.logger.info(f"This record may require more fields to be mapped. Missing fields: {missing_fields}") - - if record.get("Id") or record.get("id"): - object_id = record.pop("Id") or record.pop("id") - url = "/".join([endpoint, object_id]) - try: + if record.get("Id") or record.get("id"): + object_id = record.pop("Id") or record.pop("id") + url = "/".join([endpoint, object_id]) response = self.request_api("PATCH", endpoint=url, request_data=record) if response.status_code == 204: self.logger.info(f"{object_type} updated with id: {object_id}") return object_id, True, state_updates - - id = response.json().get("id") - self.logger.info(f"{object_type} updated with id: {id}") - return id, True, state_updates - except Exception as e: - self.logger.exception(f"Error encountered while updating {object_type}") - - if len(possible_update_fields) > 0: - for id_field in possible_update_fields: - try: - url = "/".join([endpoint, id_field, record.get(id_field)]) - response = self.request_api("PATCH", endpoint=url, request_data={k: record[k] for k in set(list(record.keys())) - set([id_field])}) + else: id = response.json().get("id") self.logger.info(f"{object_type} updated with id: {id}") return id, True, state_updates - except Exception as e: - self.logger.exception(f"Could not PATCH to {url}: {e}") - try: + if len(possible_update_fields) > 0: + for id_field in possible_update_fields: + try: + url = "/".join([endpoint, id_field, record.get(id_field)]) + response = self.request_api("PATCH", endpoint=url, request_data={k: record[k] for k in set(list(record.keys())) - set([id_field])}) + id = response.json().get("id") + self.logger.info(f"{object_type} updated with id: {id}") + return id, True, state_updates + except Exception as e: + self.logger.exception(f"Could not PATCH to {url}: {e}") + if len(possible_update_fields) > 0: self.logger.info("Failed to find updatable entity, trying to create it.") @@ -868,7 +865,4 @@ def upsert_record(self, record, context): id = response.json().get("id") self.logger.info(f"{object_type} created with id: {id}") return id, True, state_updates - except Exception as e: - self.logger.exception(f"Error encountered while creating {object_type}") - raise e - + return None, False, {} diff --git a/target_salesforce_v3/target.py b/target_salesforce_v3/target.py index 288a281..70d3da3 100644 --- a/target_salesforce_v3/target.py +++ b/target_salesforce_v3/target.py @@ -4,6 +4,7 @@ from singer_sdk import typing as th from target_hotglue.target import TargetHotglue +import copy from target_salesforce_v3.sinks import ( FallbackSink, @@ -34,6 +35,11 @@ class TargetSalesforceV3(TargetHotglue): name = "target-salesforce-v3" MAX_PARALLELISM = 10 SINK_TYPES = SINK_TYPES + sobjects = {} + current_sink_name = None + current_fields = {} + hit_rate_limit = False + def get_sink_class(self, stream_name: str): """Get sink for a stream.""" for sink_class in SINK_TYPES: