Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove unneeded requests, save target state when failing in rate limits #16

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions target_salesforce_v3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def check_salesforce_limits(self, response):
percent_used_from_total = (remaining / allotted) * 100

if percent_used_from_total > quota_percent_total:
self._target.hit_rate_limit = True
total_message = (
"Salesforce has reported {}/{} ({:3.2f}%) total REST quota "
"used across all Salesforce Applications. Terminating "
Expand Down Expand Up @@ -233,28 +234,30 @@ def sf_fields(self, object_type=None):
sobject = self.request_api("GET", f"sobjects/{object_type}/describe/")
return [f for f in sobject.json()["fields"]]

def sf_fields_description(self, object_type=None):
fld = self.sf_fields(object_type=object_type)
def sf_fields_description(self, object_type=None, object_fields=None):
if not object_fields:
object_fields = self.sf_fields(object_type=object_type)

fields = {}
fields["createable"] = [
f["name"] for f in fld if f["createable"] and not f["custom"]
f["name"] for f in object_fields if f["createable"] and not f["custom"]
]
fields["custom"] = [
f["name"] for f in fld if f["custom"]
f["name"] for f in object_fields if f["custom"]
]
fields["createable_not_default"] = [
f["name"]
for f in fld
for f in object_fields
if f["createable"] and not f["defaultedOnCreate"] and not f["custom"]
]
fields["required"] = [
f["name"]
for f in fld
for f in object_fields
if not f["nillable"] and f["createable"] and not f["defaultedOnCreate"]
]
fields["external_ids"] = [f["name"] for f in fld if f["externalId"]]
fields["external_ids"] = [f["name"] for f in object_fields if f["externalId"]]
fields["pickable"] = {}
for field in fld:
for field in object_fields:
if field["picklistValues"]:
fields["pickable"][field["name"]] = [
p["label"] for p in field["picklistValues"] if p["active"]
Expand Down
240 changes: 117 additions & 123 deletions target_salesforce_v3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dateutil.parser import parse
from datetime import datetime
from singer_sdk.exceptions import FatalAPIError, RetriableAPIError
from target_salesforce_v3.client import TargetSalesforceQuotaExceededException


class MissingObjectInSalesforceError(Exception):
Expand Down Expand Up @@ -716,159 +717,152 @@ def lookup_fields_dict(self):
def name(self):
return self.stream_name

def get_fields_for_object(self, object_type):
def get_fields_for_object(self, object_type, objects_list):
"""Check if Salesforce has an object type and fetches its fields."""
req = self.request_api("GET")
for object in req.json().get("sobjects", []):
for object in objects_list:
if object["name"] == object_type or object["label"] == object_type or object["labelPlural"] == object_type:
obj_req = self.request_api("GET", endpoint=f"sobjects/{object['name']}/describe").json()
return {f["name"]: f for f in obj_req.get("fields", [])}

raise MissingObjectInSalesforceError(f"Object type {object_type} not found in Salesforce.")

def preprocess_record(self, record, context):
# Check if object exists in Salesforce
object_type = None
req = self.request_api("GET", "sobjects")
objects_list = req.json().get("sobjects", [])
for object in objects_list:
is_name = object["name"] == self.stream_name
is_label = object["label"] == self.stream_name
is_label_plural = object["labelPlural"] == self.stream_name
if is_name or is_label or is_label_plural:
self.logger.info(f"Processing record for type {self.stream_name}. Using fallback sink.")
object_type = object["name"]
break

if not object_type:
self.logger.info(f"Record doesn't exist on Salesforce {self.stream_name} was not found on Salesforce.")
return {}

try:
fields = self.get_fields_for_object(object_type)
except MissingObjectInSalesforceError:
self.logger.info("Skipping record, because it was not found on Salesforce.")
return {}
record["object_type"] = object_type

# If lookup_fields dict exist in config use it to check if the record exists in Salesforce
object_lookup_field = self.lookup_fields_dict.get(object_type)
# check if the lookup field exists for the object
object_lookup_field = object_lookup_field if object_lookup_field in fields else None
# check if the record has a value for the lookup field
lookup_value = record.get(object_lookup_field)

req = None
# lookup for record with field from config
if object_lookup_field and lookup_value:
query_fields = ",".join([field for field in fields.keys() if field in record] + ["Id"])
query = f"SELECT {query_fields} FROM {object_type} WHERE {object_lookup_field} = '{lookup_value}'"
req = self.request_api("GET", "queryAll", params={"q": query})
req = req.json().get("records")
# lookup for record with email fields
else:
# Try to find object instance using email
email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"]
email_values = [record.get(email_field) for email_field in email_fields if record.get(email_field)]

for email_to_check in email_values:
# Escape special characters on email
for char in ["+", "-"]:
if char in email_to_check:
email_to_check = email_to_check.replace(char, f"\{char}")

query = "".join(["FIND {", email_to_check, "} ", f" IN ALL FIELDS RETURNING {object_type}(id)"])
req = self.request_api("GET", "search/", params={"q": query})
req = req.json().get("searchRecords")
# not process records if target hit API rate limits
if not self._target.hit_rate_limit:
try:
# Check if object exists in Salesforce
object_type = None

# get list of objects
if not self._target.sobjects:
req = self.request_api("GET", "sobjects")
self._target.sobjects = req.json().get("sobjects", [])
objects_list = self._target.sobjects

# find sobject for record
for object in objects_list:
is_name = object["name"] == self.stream_name
is_label = object["label"] == self.stream_name
is_label_plural = object["labelPlural"] == self.stream_name
if is_name or is_label or is_label_plural:
self.logger.info(f"Processing record for type {self.stream_name}. Using fallback sink.")
object_type = object["name"]
break

if not object_type:
return {"error": f"Object not found for {self.name} in Salesforce"}

# get record fields
try:
self._target.current_sink_name = self.name
if not self._target.current_fields or self.name != self._target.current_sink_name:
self._target.current_fields = self.get_fields_for_object(object_type, objects_list)
fields = self._target.current_fields
except MissingObjectInSalesforceError:
self.logger.info("Skipping record, because it was not found on Salesforce.")
return {}
record["object_type"] = object_type

# If lookup_fields dict exist in config use it to check if the record exists in Salesforce
object_lookup_field = self.lookup_fields_dict.get(object_type)
# check if the lookup field exists for the object
object_lookup_field = object_lookup_field if object_lookup_field in fields else None
# check if the record has a value for the lookup field
lookup_value = record.get(object_lookup_field)

req = None
# lookup for record with field from config
if object_lookup_field and lookup_value:
query_fields = ",".join([field for field in fields.keys() if field in record] + ["Id"])
query = f"SELECT {query_fields} FROM {object_type} WHERE {object_lookup_field} = '{lookup_value}'"
req = self.request_api("GET", "queryAll", params={"q": query})
req = req.json().get("records")
# lookup for record with email fields
else:
# Try to find object instance using email
email_fields = ["Email", "npe01__AlternateEmail__c", "npe01__HomeEmail__c", "npe01__Preferred_Email__c", "npe01__WorkEmail__c"]
email_values = [record.get(email_field) for email_field in email_fields if record.get(email_field)]

for email_to_check in email_values:
# Escape special characters on email
for char in ["+", "-"]:
if char in email_to_check:
email_to_check = email_to_check.replace(char, f"\{char}")

query = "".join(["FIND {", email_to_check, "} ", f" IN ALL FIELDS RETURNING {object_type}(id)"])
req = self.request_api("GET", "search/", params={"q": query})
req = req.json().get("searchRecords")
if req:
break

# if record already exists add its Id for patching
if req:
break
existing_record = req[0]
# if flag only_upsert_empty_fields is true, only send fields with currently empty values
if self.config.get("only_upsert_empty_fields"):
record = {k:v for k,v in record.items() if not existing_record.get(k)}
record["Id"] = existing_record["Id"]
return record
except TargetSalesforceQuotaExceededException as e:
return {"error": str(e)}
else:
return {"error": "Unprocessed record due to requests exceeded API rate limits"}

# if record already exists add its Id for patching
if req:
existing_record = req[0]
# if flag only_upsert_empty_fields is true, only send fields with currently empty values
if self.config.get("only_upsert_empty_fields"):
record = {k:v for k,v in record.items() if not existing_record.get(k)}
record["Id"] = existing_record["Id"]
return record

def upsert_record(self, record, context):
if record == {} or record is None:
return None, False, {}

state_updates = dict()

object_type = record.pop("object_type", None)
self.logger.info(f"Processing record for type {self.stream_name}. Using fallback sink.")

if record == {}:
self.logger.info(f"Processing record for type {self.stream_name} failed. Check logs.")
return

fields_desc = self.sf_fields_description(object_type=object_type)

possible_update_fields = []
# Not process records if target hit API rate limits
if record.get("error"):
return None, False, record
if record:
state_updates = dict()

for field in fields_desc["external_ids"]:
if field in record:
possible_update_fields.append(field)
# build the right endpoint
object_type = record.pop("object_type", None)
endpoint = f"sobjects/{object_type}"
self.logger.info(f"Processing record for type {self.stream_name}. Using fallback sink.")

if record.get("Id"):
fields = ["Id"]
else:
list_fields = [field_list for field_list in fields_desc.values()]
fields = []
for list_field in list_fields:
for item in list_field:
fields.append(item)
# check if all payload fields exist in salesforce
fields = self._target.current_fields
fields_desc = self.sf_fields_description(object_type, fields.values())

endpoint = f"sobjects/{object_type}"
possible_update_fields = [field for field in fields_desc["external_ids"] if field in record]

for field in record.keys():
if field not in fields:
self.logger.info(f"Field {field} doesn't exist on Salesforce.")
for field in record.keys():
if field not in fields.keys():
self.logger.info(f"Field {field} doesn't exist on Salesforce.")

missing_fields = list(set(fields) - set(record.keys()))

missing_fields = list(set(fields) - set(record.keys()))
if len(missing_fields) > 0.5 * len(fields):
self.logger.info(f"This record may require more fields to be mapped. Missing fields: {missing_fields}")

if len(missing_fields) > 0.5 * len(fields):
self.logger.info(f"This record may require more fields to be mapped. Missing fields: {missing_fields}")

if record.get("Id") or record.get("id"):
object_id = record.pop("Id") or record.pop("id")
url = "/".join([endpoint, object_id])
try:
if record.get("Id") or record.get("id"):
object_id = record.pop("Id") or record.pop("id")
url = "/".join([endpoint, object_id])
response = self.request_api("PATCH", endpoint=url, request_data=record)
if response.status_code == 204:
self.logger.info(f"{object_type} updated with id: {object_id}")
return object_id, True, state_updates

id = response.json().get("id")
self.logger.info(f"{object_type} updated with id: {id}")
return id, True, state_updates
except Exception as e:
self.logger.exception(f"Error encountered while updating {object_type}")

if len(possible_update_fields) > 0:
for id_field in possible_update_fields:
try:
url = "/".join([endpoint, id_field, record.get(id_field)])
response = self.request_api("PATCH", endpoint=url, request_data={k: record[k] for k in set(list(record.keys())) - set([id_field])})
else:
id = response.json().get("id")
self.logger.info(f"{object_type} updated with id: {id}")
return id, True, state_updates
except Exception as e:
self.logger.exception(f"Could not PATCH to {url}: {e}")

try:
if len(possible_update_fields) > 0:
for id_field in possible_update_fields:
try:
url = "/".join([endpoint, id_field, record.get(id_field)])
response = self.request_api("PATCH", endpoint=url, request_data={k: record[k] for k in set(list(record.keys())) - set([id_field])})
id = response.json().get("id")
self.logger.info(f"{object_type} updated with id: {id}")
return id, True, state_updates
except Exception as e:
self.logger.exception(f"Could not PATCH to {url}: {e}")

if len(possible_update_fields) > 0:
self.logger.info("Failed to find updatable entity, trying to create it.")

response = self.request_api("POST", endpoint=endpoint, request_data=record)
id = response.json().get("id")
self.logger.info(f"{object_type} created with id: {id}")
return id, True, state_updates
except Exception as e:
self.logger.exception(f"Error encountered while creating {object_type}")
raise e

return None, False, {}
6 changes: 6 additions & 0 deletions target_salesforce_v3/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from singer_sdk import typing as th
from target_hotglue.target import TargetHotglue
import copy

from target_salesforce_v3.sinks import (
FallbackSink,
Expand Down Expand Up @@ -34,6 +35,11 @@ class TargetSalesforceV3(TargetHotglue):
name = "target-salesforce-v3"
MAX_PARALLELISM = 10
SINK_TYPES = SINK_TYPES
sobjects = {}
current_sink_name = None
current_fields = {}
hit_rate_limit = False

def get_sink_class(self, stream_name: str):
"""Get sink for a stream."""
for sink_class in SINK_TYPES:
Expand Down
Loading