Skip to content

Commit

Permalink
Split parse_response into two methods
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronfriedman6 committed Apr 4, 2024
1 parent e59de79 commit 7e9721a
Showing 1 changed file with 60 additions and 39 deletions.
99 changes: 60 additions & 39 deletions lib/shoppertrak_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ def __init__(self, username, password):
self.today_str = datetime.now(pytz.timezone("US/Eastern")).date().isoformat()

def query(self, endpoint, date_str, query_count=1):
"""
Sends query to ShopperTrak API and returns the result as an XML root if
possible. If the API returns that it's busy, this method waits and recursively
calls itself.
"""
full_url = self.base_url + endpoint

self.logger.info(f"Querying {endpoint} for {date_str} data")
Expand Down Expand Up @@ -56,61 +61,71 @@ def query(self, endpoint, date_str, query_count=1):
)

def parse_response(self, xml_root, input_date_str, is_recovery_mode=False):
"""
Takes API response as an XML root and returns a list of dictionaries containing
result records
"""
rows = []
for site_xml in xml_root.findall("site"):
site_str = self._get_xml_str(site_xml, "siteID")
site_val = self._get_xml_str(site_xml, "siteID")
for date_xml in site_xml.findall("date"):
date_str = self._get_xml_str(date_xml, "dateValue")
if date_str != input_date_str:
date_val = self._get_xml_str(date_xml, "dateValue")
if date_val != input_date_str:
self.logger.error(
f"Request date does not match response date.\nRequest date: "
f"{input_date_str}\nResponse date: {date_str}"
f"{input_date_str}\nResponse date: {date_val}"
)
raise ShopperTrakApiClientError(
f"Request date does not match response date.\nRequest date: "
f"{input_date_str}\nResponse date: {date_str}"
f"{input_date_str}\nResponse date: {date_val}"
)
for entrance_xml in date_xml.findall("entrance"):
entrance_val = self._get_xml_str(entrance_xml, "name")
if entrance_val:
entrance_val = self._cast_str_to_int(entrance_val.lstrip("EP"))
for traffic_xml in entrance_xml.findall("traffic"):
enters = self._cast_str_to_int(
self._get_xml_str(traffic_xml, "enters")
result_row = self._form_row(
traffic_xml,
site_val,
date_val,
entrance_val,
is_recovery_mode,
)
exits = self._cast_str_to_int(
self._get_xml_str(traffic_xml, "exits")
)

start_time_str = self._get_xml_str(traffic_xml, "startTime")
start_dt_str = datetime.strptime(
date_str + " " + start_time_str, "%Y%m%d %H%M%S"
).isoformat()

code = self._get_xml_str(traffic_xml, "code")
is_healthy_orbit = code == "01"
if code != "01" and code != "02":
self.logger.warning(
f"Unknown code: '{code}'. Setting is_healthy_orbit to "
f"False"
)

if is_healthy_orbit or not is_recovery_mode:
rows.append(
{
"shoppertrak_site_id": site_str,
"orbit": entrance_val,
"increment_start": start_dt_str,
"enters": enters,
"exits": exits,
"is_healthy_orbit": is_healthy_orbit,
"is_recovery_data": is_recovery_mode,
"poll_date": self.today_str,
}
)
if result_row["is_healthy_orbit"] or not is_recovery_mode:
rows.append(result_row)
return rows

def _form_row(
self, traffic_xml, site_val, date_val, entrance_val, is_recovery_mode
):
"""Forms one result row out of various XML elements and values"""
start_time_str = self._get_xml_str(traffic_xml, "startTime")
start_dt_str = datetime.strptime(
date_val + " " + start_time_str, "%Y%m%d %H%M%S"
).isoformat()

status_code = self._get_xml_str(traffic_xml, "code")
if status_code != "01" and status_code != "02":
self.logger.warning(
f"Unknown code: '{status_code}'. Setting is_healthy_orbit to False"
)

return {
"shoppertrak_site_id": site_val,
"orbit": entrance_val,
"increment_start": start_dt_str,
"enters": self._cast_str_to_int(self._get_xml_str(traffic_xml, "enters")),
"exits": self._cast_str_to_int(self._get_xml_str(traffic_xml, "exits")),
"is_healthy_orbit": status_code == "01",
"is_recovery_data": is_recovery_mode,
"poll_date": self.today_str,
}

def _check_response(self, response_text):
"""
If API response is XML, does not contain an error, and contains at least one
traffic attribute, returns response as an XML root. Otherwise, throws an error.
"""
try:
root = ET.fromstring(response_text)
error = root.find("error")
Expand Down Expand Up @@ -139,13 +154,19 @@ def _check_response(self, response_text):
return root

def _get_xml_str(self, xml, attribute):
if not xml.get(attribute):
"""
Returns XML attribute as string and logs a warning if the attribute does not
exist or is empty
"""
attribute_str = (xml.get(attribute) or "").strip()
if not attribute_str:
self.logger.warning(f"Found blank '{attribute}'")
return None
else:
return xml.get(attribute).strip()
return attribute_str

def _cast_str_to_int(self, input_str):
"""Casts string to int and logs a warning if the string cannot be cast"""
if not input_str:
return None

Expand Down

0 comments on commit 7e9721a

Please sign in to comment.