diff --git a/CHANGELOG.md b/CHANGELOG.md
index 441b13f..d7dd9ca 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 2024-11-21 -- v1.0.5
+### Fixed
+- Catch non-fatal XML errors and continue requesting. Only throw an error and stop when the API limit has been exceeded, when the request itself has failed, or when the ShopperTrak server cannot be reached even after retrying.
+
## 2024-11-14 -- v1.0.4
### Fixed
- When site ID is not found (error code "E101"), skip it without throwing an error
diff --git a/lib/__init__.py b/lib/__init__.py
index 0bacca7..05785c1 100644
--- a/lib/__init__.py
+++ b/lib/__init__.py
@@ -1,4 +1,5 @@
from .shoppertrak_api_client import (
+ APIStatus,
ShopperTrakApiClient,
ShopperTrakApiClientError,
ALL_SITES_ENDPOINT,
diff --git a/lib/pipeline_controller.py b/lib/pipeline_controller.py
index da53bed..dfe516f 100644
--- a/lib/pipeline_controller.py
+++ b/lib/pipeline_controller.py
@@ -1,6 +1,5 @@
import os
import pytz
-import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from helpers.query_helper import (
@@ -12,6 +11,7 @@
REDSHIFT_RECOVERABLE_QUERY,
)
from lib import (
+ APIStatus,
ShopperTrakApiClient,
ALL_SITES_ENDPOINT,
SINGLE_SITE_ENDPOINT,
@@ -106,7 +106,10 @@ def get_location_hours_dict(self):
)
)
self.redshift_client.close_connection()
- return {(row[0], row[1]): (row[2], row[3]) for row in raw_hours}
+ return {
+ (branch_code, weekday): (regular_open, regular_close)
+ for branch_code, weekday, regular_open, regular_close in raw_hours
+ }
def process_all_sites_data(self, end_date, batch_num):
"""Gets visits data from all available sites for the given day(s)"""
@@ -114,19 +117,15 @@ def process_all_sites_data(self, end_date, batch_num):
poll_date = last_poll_date + timedelta(days=1)
if poll_date <= end_date:
self.logger.info(f"Beginning batch {batch_num+1}: {poll_date.isoformat()}")
- all_sites_xml_root = self.shoppertrak_api_client.query(
+ all_sites_response = self.shoppertrak_api_client.query(
ALL_SITES_ENDPOINT, poll_date
)
- if type(all_sites_xml_root) != ET.Element:
- self.logger.error(
- "Error querying ShopperTrak API for all sites visits data"
- )
- raise PipelineControllerError(
- "Error querying ShopperTrak API for all sites visits data"
- ) from None
+ if all_sites_response == APIStatus.ERROR:
+ self.logger.error("Failed to retrieve all sites visits data")
+ return
results = self.shoppertrak_api_client.parse_response(
- all_sites_xml_root, poll_date
+ all_sites_response, poll_date
)
encoded_records = self.avro_encoder.encode_batch(results)
if not self.ignore_kinesis:
@@ -165,8 +164,8 @@ def process_broken_orbits(self, start_date, end_date):
known_data_dict = dict()
if known_data:
known_data_dict = {
- (row[0], row[1], row[2]): (row[3], row[4], row[5], row[6])
- for row in known_data
+ (site_id, orbit, inc_start): (redshift_id, is_healthy, enters, exits)
+ for site_id, orbit, inc_start, redshift_id, is_healthy, enters, exits in known_data
}
self._recover_data(recoverable_site_dates, known_data_dict)
self.redshift_client.close_connection()
@@ -177,20 +176,15 @@ def _recover_data(self, site_dates, known_data_dict):
unhealthy data. Then check to see if the returned data is actually "recovered"
data, as it may have never been unhealthy to begin with. If so, send to Kinesis.
"""
- for row in site_dates:
- site_xml_root = self.shoppertrak_api_client.query(
- SINGLE_SITE_ENDPOINT + row[0], row[1]
+ for site_id, visits_date in site_dates:
+ site_response = self.shoppertrak_api_client.query(
+ SINGLE_SITE_ENDPOINT + site_id, visits_date
)
- # If the site ID can't be found (E101) or multiple sites match the same site
- # ID (E104), continue to the next site. If the API limit has been reached
- # (E107), stop.
- if site_xml_root == "E101" or site_xml_root == "E104":
- continue
- elif site_xml_root == "E107":
- break
+ if site_response == APIStatus.ERROR:
+ self.logger.error(f"Failed to retrieve site visits data for {site_id}")
else:
site_results = self.shoppertrak_api_client.parse_response(
- site_xml_root, row[1], is_recovery_mode=True
+ site_response, visits_date, is_recovery_mode=True
)
self._process_recovered_data(site_results, known_data_dict)
@@ -248,8 +242,3 @@ def _get_poll_date(self, batch_num):
else:
poll_str = self.s3_client.fetch_cache()["last_poll_date"]
return datetime.strptime(poll_str, "%Y-%m-%d").date()
-
-
-class PipelineControllerError(Exception):
- def __init__(self, message=None):
- self.message = message
diff --git a/lib/shoppertrak_api_client.py b/lib/shoppertrak_api_client.py
index 444882f..ec7240b 100644
--- a/lib/shoppertrak_api_client.py
+++ b/lib/shoppertrak_api_client.py
@@ -5,6 +5,7 @@
import xml.etree.ElementTree as ET
from datetime import datetime
+from enum import Enum
from nypl_py_utils.functions.log_helper import create_log
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException
@@ -14,6 +15,15 @@
_WEEKDAY_MAP = {0: "Mon", 1: "Tue", 2: "Wed", 3: "Thu", 4: "Fri", 5: "Sat", 6: "Sun"}
+class APIStatus(Enum):
+ SUCCESS = 1 # The API successfully retrieved the data
+ RETRY = 2 # The API is busy or down and should be retried later
+
+ # The API returned a request-specific error. This status indicates that while this
+ # request failed, others with different parameters may still succeed.
+ ERROR = 3
+
+
class ShopperTrakApiClient:
"""Class for querying the ShopperTrak API for location visits data"""
@@ -27,9 +37,9 @@ def __init__(self, username, password, location_hours_dict):
def query(self, endpoint, query_date, query_count=1):
"""
- Sends query to ShopperTrak API and returns the result as an XML root if
- possible. If the API returns that it's busy, this method waits and recursively
- calls itself.
+ Sends query to ShopperTrak API and either a) returns the result as an XML root
+ if the query was successful, b) returns APIStatus.ERROR if the query failed but
+ others should be attempted, or c) waits and tries again if the API was busy.
"""
full_url = self.base_url + endpoint
date_str = query_date.strftime("%Y%m%d")
@@ -48,23 +58,26 @@ def query(self, endpoint, query_date, query_count=1):
f"Failed to retrieve response from {full_url}: {e}"
) from None
- response_root = self._check_response(response.text)
- if response_root == "E108" or response_root == "E000":
+ response_status, response_root = self._check_response(response.text)
+ if response_status == APIStatus.SUCCESS:
+ return response_root
+ elif response_status == APIStatus.ERROR:
+ return response_status
+ elif response_status == APIStatus.RETRY:
if query_count < self.max_retries:
self.logger.info("Waiting 5 minutes and trying again")
time.sleep(300)
return self.query(endpoint, query_date, query_count + 1)
else:
self.logger.error(
- f"Reached max retries: sent {self.max_retries} queries with no "
- f"response"
+ f"Hit retry limit: sent {self.max_retries} queries with no response"
)
raise ShopperTrakApiClientError(
- f"Reached max retries: sent {self.max_retries} queries with no "
- f"response"
+ f"Hit retry limit: sent {self.max_retries} queries with no response"
)
else:
- return response_root
+ self.logger.error(f"Unknown API status: {response_status}")
+ raise ShopperTrakApiClientError(f"Unknown API status: {response_status}")
def parse_response(self, xml_root, input_date, is_recovery_mode=False):
"""
@@ -191,51 +204,33 @@ def _form_row(
def _check_response(self, response_text):
"""
- If API response is XML, does not contain an error, and contains at least one
- traffic attribute, returns response as an XML root. Otherwise, throws an error.
+ Checks response for errors. If none are found, returns the XML root. Otherwise,
+ either throws an error or returns an APIStatus where appropriate.
"""
try:
root = ET.fromstring(response_text)
error = root.find("error")
except ET.ParseError as e:
self.logger.error(f"Could not parse XML response {response_text}: {e}")
- raise ShopperTrakApiClientError(
- f"Could not parse XML response {response_text}: {e}"
- ) from None
+ return APIStatus.ERROR, None
if error is not None and error.text is not None:
- # E000 is used when ShopperTrak is down and they recommend trying again
- if error.text == "E000":
- self.logger.info("E000: ShopperTrak is down")
- return "E000"
- # E101 is used when the given site ID is not recognized
- elif error.text == "E101":
- self.logger.warning("E101: site ID not found")
- return "E101"
- # E104 is used when the given site ID matches multiple sites
- elif error.text == "E104":
- self.logger.warning("E104: site ID has multiple matches")
- return "E104"
# E107 is used when the daily API limit has been exceeded
- elif error.text == "E107":
- self.logger.info("E107: API limit exceeded")
- return "E107"
- # E108 is used when ShopperTrak is busy and they recommend trying again
- elif error.text == "E108":
- self.logger.info("E108: ShopperTrak is busy")
- return "E108"
+ if error.text == "E107":
+ self.logger.error("API limit exceeded")
+ raise ShopperTrakApiClientError(f"API limit exceeded")
+ # E000 is used when ShopperTrak is down and E108 is used when it's busy
+ elif error.text == "E000" or error.text == "E108":
+ self.logger.info("ShopperTrak is unavailable")
+ return APIStatus.RETRY, None
else:
self.logger.error(f"Error found in XML response: {response_text}")
- raise ShopperTrakApiClientError(
- f"Error found in XML response: {response_text}"
- )
+ return APIStatus.ERROR, None
elif len(root.findall(".//traffic")) == 0:
self.logger.error(f"No traffic found in XML response: {response_text}")
- raise ShopperTrakApiClientError(
- f"No traffic found in XML response: {response_text}"
- )
+ return APIStatus.ERROR, None
else:
- return root
+ return APIStatus.SUCCESS, root
def _get_xml_str(self, xml, attribute):
"""
diff --git a/requirements.txt b/requirements.txt
index 34a962a..6bb9378 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,3 @@
-nypl-py-utils[avro-client,kinesis-client,redshift-client,s3-client,config-helper]==1.4.0
+nypl-py-utils[avro-client,kinesis-client,redshift-client,s3-client,config-helper]==1.5.0
pytz
requests
\ No newline at end of file
diff --git a/tests/test_pipeline_controller.py b/tests/test_pipeline_controller.py
index 54a3637..1a04764 100644
--- a/tests/test_pipeline_controller.py
+++ b/tests/test_pipeline_controller.py
@@ -4,7 +4,8 @@
from datetime import date, datetime, time
from helpers.query_helper import REDSHIFT_DROP_QUERY, REDSHIFT_RECOVERABLE_QUERY
-from lib.pipeline_controller import PipelineController, PipelineControllerError
+from lib.pipeline_controller import PipelineController
+from lib.shoppertrak_api_client import APIStatus
from tests.test_helpers import TestHelpers
@@ -219,14 +220,15 @@ def test_process_all_sites_data_multi_run(self, test_instance, mock_logger, mock
]
)
- def test_process_all_sites_error(self, test_instance, mock_logger, mocker):
+ def test_process_all_sites_error(self, test_instance, mock_logger, mocker, caplog):
test_instance.s3_client.fetch_cache.return_value = {
"last_poll_date": "2023-12-30"}
- test_instance.shoppertrak_api_client.query.return_value = "error"
+ test_instance.shoppertrak_api_client.query.return_value = APIStatus.ERROR
- with pytest.raises(PipelineControllerError):
+ with caplog.at_level(logging.WARNING):
test_instance.process_all_sites_data(date(2023, 12, 31), 0)
+ assert "Failed to retrieve all sites visits data" in caplog.text
test_instance.s3_client.fetch_cache.assert_called_once()
test_instance.shoppertrak_api_client.query.assert_called_once_with(
"allsites", date(2023, 12, 31)
@@ -323,23 +325,24 @@ def test_recover_data(self, test_instance, mock_logger, mocker):
]
)
- def test_recover_data_bad_sites(self, test_instance, mock_logger, mocker):
+ def test_recover_data_error(self, test_instance, mocker, caplog):
test_instance.shoppertrak_api_client.query.side_effect = [
- _TEST_XML_ROOT, "E104", "E101", _TEST_XML_ROOT]
+ _TEST_XML_ROOT, APIStatus.ERROR, _TEST_XML_ROOT]
mocked_process_recovered_data_method = mocker.patch(
"lib.pipeline_controller.PipelineController._process_recovered_data"
)
- test_instance._recover_data(
- [
- ("aa", date(2023, 12, 1)),
- ("bb", date(2023, 12, 1)),
- ("cc", date(2023, 12, 1)),
- ("aa", date(2023, 12, 2)),
- ],
- _TEST_KNOWN_DATA_DICT,
- )
-
+ with caplog.at_level(logging.WARNING):
+ test_instance._recover_data(
+ [
+ ("aa", date(2023, 12, 1)),
+ ("bb", date(2023, 12, 1)),
+ ("aa", date(2023, 12, 2)),
+ ],
+ _TEST_KNOWN_DATA_DICT,
+ )
+
+ assert "Failed to retrieve site visits data for bb" in caplog.text
test_instance.shoppertrak_api_client.parse_response.assert_has_calls(
[
mocker.call(_TEST_XML_ROOT, date(2023, 12, 1), is_recovery_mode=True),
@@ -347,27 +350,6 @@ def test_recover_data_bad_sites(self, test_instance, mock_logger, mocker):
]
)
assert mocked_process_recovered_data_method.call_count == 2
-
- def test_recover_data_api_limit(self, test_instance, mock_logger, mocker):
- test_instance.shoppertrak_api_client.query.side_effect = [
- _TEST_XML_ROOT, "E107"]
- mocked_process_recovered_data_method = mocker.patch(
- "lib.pipeline_controller.PipelineController._process_recovered_data"
- )
-
- test_instance._recover_data(
- [
- ("aa", date(2023, 12, 1)),
- ("bb", date(2023, 12, 2)),
- ("aa", date(2023, 12, 2)),
- ],
- _TEST_KNOWN_DATA_DICT,
- )
-
- test_instance.shoppertrak_api_client.parse_response.assert_called_once_with(
- _TEST_XML_ROOT, date(2023, 12, 1), is_recovery_mode=True
- )
- mocked_process_recovered_data_method.assert_called_once()
def test_process_recovered_data(self, test_instance, mocker, caplog):
mocked_update_query = mocker.patch(
diff --git a/tests/test_shoppertrak_api_client.py b/tests/test_shoppertrak_api_client.py
index 0c93868..b357c63 100644
--- a/tests/test_shoppertrak_api_client.py
+++ b/tests/test_shoppertrak_api_client.py
@@ -5,7 +5,7 @@
from copy import deepcopy
from datetime import date, time
-from lib import ShopperTrakApiClient, ShopperTrakApiClientError
+from lib import APIStatus, ShopperTrakApiClient, ShopperTrakApiClientError
from requests.exceptions import ConnectTimeout
from tests.test_helpers import TestHelpers
@@ -118,58 +118,38 @@ def test_query(self, test_instance, requests_mock, mocker):
xml_root = mocker.MagicMock()
mocked_check_response_method = mocker.patch(
- "lib.ShopperTrakApiClient._check_response", return_value=xml_root
+ "lib.ShopperTrakApiClient._check_response",
+ return_value=(APIStatus.SUCCESS, xml_root),
)
assert test_instance.query("test_endpoint", date(2023, 12, 31)) == xml_root
mocked_check_response_method.assert_called_once_with(_TEST_API_RESPONSE)
- def test_query_request_exception(self, test_instance, requests_mock):
+ def test_query_request_exception(self, test_instance, requests_mock, mocker, caplog):
requests_mock.get(
"https://test_shoppertrak_url/test_endpoint", exc=ConnectTimeout
)
+ mocked_check_response_method = mocker.patch(
+ "lib.ShopperTrakApiClient._check_response")
with pytest.raises(ShopperTrakApiClientError):
test_instance.query("test_endpoint", date(2023, 12, 31))
-
- def test_query_unrecognized_site(self, test_instance, requests_mock, mocker):
- requests_mock.get(
- "https://test_shoppertrak_url/test_endpoint"
- "?date=20231231&increment=15&total_property=N",
- text="Error 101",
- )
- mocked_check_response_method = mocker.patch(
- "lib.ShopperTrakApiClient._check_response", return_value="E101"
- )
-
- assert test_instance.query("test_endpoint", date(2023, 12, 31)) == "E101"
- mocked_check_response_method.assert_called_once_with("Error 101")
-
- def test_query_duplicate_sites(self, test_instance, requests_mock, mocker):
- requests_mock.get(
- "https://test_shoppertrak_url/test_endpoint"
- "?date=20231231&increment=15&total_property=N",
- text="Error 104",
- )
- mocked_check_response_method = mocker.patch(
- "lib.ShopperTrakApiClient._check_response", return_value="E104"
- )
-
- assert test_instance.query("test_endpoint", date(2023, 12, 31)) == "E104"
- mocked_check_response_method.assert_called_once_with("Error 104")
-
- def test_query_api_limit(self, test_instance, requests_mock, mocker):
+
+ assert ("Failed to retrieve response from "
+ "https://test_shoppertrak_url/test_endpoint") in caplog.text
+ mocked_check_response_method.assert_not_called()
+
+ def test_query_non_fatal_error(self, test_instance, requests_mock, mocker):
requests_mock.get(
"https://test_shoppertrak_url/test_endpoint"
"?date=20231231&increment=15&total_property=N",
- text="Error 107",
- )
- mocked_check_response_method = mocker.patch(
- "lib.ShopperTrakApiClient._check_response", return_value="E107"
+ text="error",
)
+ mocker.patch("lib.ShopperTrakApiClient._check_response",
+ return_value=(APIStatus.ERROR, None))
- assert test_instance.query("test_endpoint", date(2023, 12, 31)) == "E107"
- mocked_check_response_method.assert_called_once_with("Error 107")
+ assert test_instance.query(
+ "test_endpoint", date(2023, 12, 31)) == APIStatus.ERROR
def test_query_retry_success(self, test_instance, requests_mock, mocker):
mock_sleep = mocker.patch("time.sleep")
@@ -182,7 +162,8 @@ def test_query_retry_success(self, test_instance, requests_mock, mocker):
xml_root = mocker.MagicMock()
mocked_check_response_method = mocker.patch(
"lib.ShopperTrakApiClient._check_response",
- side_effect=["E000", "E108", xml_root],
+ side_effect=[(APIStatus.RETRY, None), (APIStatus.RETRY, None),
+ (APIStatus.SUCCESS, xml_root)],
)
assert test_instance.query("test_endpoint", date(2023, 12, 31)) == xml_root
@@ -194,7 +175,7 @@ def test_query_retry_success(self, test_instance, requests_mock, mocker):
)
assert mock_sleep.call_count == 2
- def test_query_retry_fail(self, test_instance, requests_mock, mocker):
+ def test_query_retry_fail(self, test_instance, requests_mock, mocker, caplog):
mock_sleep = mocker.patch("time.sleep")
requests_mock.get(
"https://test_shoppertrak_url/test_endpoint"
@@ -203,71 +184,91 @@ def test_query_retry_fail(self, test_instance, requests_mock, mocker):
)
mocked_check_response_method = mocker.patch(
"lib.ShopperTrakApiClient._check_response",
- side_effect=["E000", "E108", "E108"],
+ return_value=(APIStatus.RETRY, None),
)
with pytest.raises(ShopperTrakApiClientError):
test_instance.query("test_endpoint", date(2023, 12, 31))
+ assert "Hit retry limit: sent 3 queries with no response" in caplog.text
assert mocked_check_response_method.call_count == 3
assert mock_sleep.call_count == 2
- def test_check_response(self, test_instance):
- CHECKED_RESPONSE = test_instance._check_response(_TEST_API_RESPONSE)
- assert CHECKED_RESPONSE is not None
- assert CHECKED_RESPONSE != "E104"
- assert CHECKED_RESPONSE != "E107"
- assert CHECKED_RESPONSE != "E108"
- assert CHECKED_RESPONSE != "E000"
-
- def test_check_response_unrecognized_site(self, test_instance):
- assert test_instance._check_response(
- 'E101'
- 'The Customer Store ID supplied is not recognized by the system.'
- ''
- ) == "E101"
-
- def test_check_response_duplicate_site(self, test_instance):
- assert test_instance._check_response(
- 'E104The '
- 'Customer Store ID supplied has multiple matches.'
- ) == "E104"
-
- def test_check_response_api_limit(self, test_instance):
- assert test_instance._check_response(
- 'E107Customer '
- 'has exceeded the maximum number of requests allowed in a 24 hour period.'
- ''
- ) == "E107"
-
- def test_check_response_busy(self, test_instance):
- assert test_instance._check_response(
- 'E108'
- 'Server is busy'
- ) == "E108"
-
- def test_check_response_down(self, test_instance):
- assert test_instance._check_response(
- 'E000'
- 'Server is down'
- ) == "E000"
+ def test_query_bad_status(self, test_instance, requests_mock, mocker, caplog):
+ requests_mock.get(
+ "https://test_shoppertrak_url/test_endpoint"
+ "?date=20231231&increment=15&total_property=N",
+ text="",
+ )
+ mocker.patch("lib.ShopperTrakApiClient._check_response",
+ return_value=(None, None))
- def test_check_response_unparsable(self, test_instance):
with pytest.raises(ShopperTrakApiClientError):
- test_instance._check_response("bad xml")
+ test_instance.query("test_endpoint", date(2023, 12, 31))
+
+ assert "Unknown API status: None" in caplog.text
- def test_check_response_xml_error(self, test_instance):
+ def test_check_response(self, test_instance):
+ status, root = test_instance._check_response(_TEST_API_RESPONSE)
+ assert status == APIStatus.SUCCESS
+ assert type(root) == ET.Element
+
+ def test_check_response_unparsable(self, test_instance, caplog):
+ with caplog.at_level(logging.ERROR):
+ status, root = test_instance._check_response("bad xml")
+
+ assert "Could not parse XML response bad xml" in caplog.text
+ assert status == APIStatus.ERROR
+ assert root is None
+
+ def test_check_response_api_limit(self, test_instance, caplog):
with pytest.raises(ShopperTrakApiClientError):
test_instance._check_response(
+ 'E107'
+ 'Customer has exceeded the maximum number of requests allowed in a 24 '
+ 'hour period.')
+ assert "API limit exceeded" in caplog.text
+
+ def test_check_response_down(self, test_instance, caplog):
+ with caplog.at_level(logging.WARNING):
+ status, root = test_instance._check_response(
+ 'E000'
+ 'Server is down')
+
+ assert caplog.text == ""
+ assert status == APIStatus.RETRY
+ assert root is None
+
+ def test_check_response_busy(self, test_instance, caplog):
+ with caplog.at_level(logging.WARNING):
+ status, root = test_instance._check_response(
+ 'E108'
+ 'Server is busy')
+
+ assert caplog.text == ""
+ assert status == APIStatus.RETRY
+ assert root is None
+
+ def test_check_response_xml_error(self, test_instance, caplog):
+ with caplog.at_level(logging.ERROR):
+ status, root = test_instance._check_response(
'E999'
'Error!')
- def test_check_response_no_traffic(self, test_instance):
- with pytest.raises(ShopperTrakApiClientError):
- test_instance._check_response(
+ assert "Error found in XML response:" in caplog.text
+ assert status == APIStatus.ERROR
+ assert root is None
+
+ def test_check_response_no_traffic(self, test_instance, caplog):
+ with caplog.at_level(logging.ERROR):
+ status, root = test_instance._check_response(
''
''
'')
+
+ assert "No traffic found in XML response:" in caplog.text
+ assert status == APIStatus.ERROR
+ assert root is None
def test_parse_response(self, test_instance, caplog):
with caplog.at_level(logging.WARNING):