Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue2139: Using single pickle file to cache meta_map and smart_api information #2208

Merged
merged 3 commits into from
Nov 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 44 additions & 97 deletions code/ARAX/ARAXQuery/Expand/kp_info_cacher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
'''
The kp_info_cacher.py file is a Python script that defines a class called KPInfoCacher.
This class is responsible for caching information about knowledge providers (KPs) used by the Reasoner API (TRAPI) service.
The cached information includes metadata about KPs and their APIs, as well as information about which KPs are currently available and which ones are down.
'''
import os
import pathlib
import pickle
Expand All @@ -23,17 +28,13 @@ def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)
from smartapi import SmartAPI


MAX_TOTAL_WAIT_FOR_CACHE_SEC = 180.0
WAIT_LOOP_SEC = 0.1

class KPInfoCacher:

def __init__(self):
self.rtx_config = RTXConfiguration()
version_string = f"{self.rtx_config.trapi_major_version}--{self.rtx_config.maturity}"
self.smart_api_cache_path = f"{os.path.dirname(os.path.abspath(__file__))}/cache_smart_api_{version_string}.pkl"
self.meta_map_cache_path = f"{os.path.dirname(os.path.abspath(__file__))}/cache_meta_map_{version_string}.pkl"
self.cache_refresh_pid_path = f"{os.path.dirname(os.path.abspath(__file__))}/cache_refresh.pid"
self.smart_api_and_meta_map_cache = f"{os.path.dirname(os.path.abspath(__file__))}/cache_smart_api_and_meta_map_{version_string}.pkl"

def refresh_kp_info_caches(self):
"""
Expand All @@ -53,34 +54,34 @@ def refresh_kp_info_caches(self):
req_maturity=self.rtx_config.maturity)
if not smart_api_kp_registrations:
print(f"Didn't get any KP registrations back from SmartAPI!")
previous_smart_api_cache_exists = pathlib.Path(self.smart_api_cache_path).exists()
if smart_api_kp_registrations or not previous_smart_api_cache_exists:
previous_cache_exists = pathlib.Path(self.smart_api_and_meta_map_cache).exists()
if smart_api_kp_registrations or not previous_cache_exists:
# Transform the info into the format we want
allowed_kp_urls = {kp_registration["infores_name"]: self._get_kp_url_from_smartapi_registration(kp_registration)
for kp_registration in smart_api_kp_registrations}

smart_api_cache_contents = {"allowed_kp_urls": allowed_kp_urls,
"kps_excluded_by_version": smart_api_helper.kps_excluded_by_version,
"kps_excluded_by_maturity": smart_api_helper.kps_excluded_by_maturity}

# Save the SmartAPI info to the proper cache file in a thread-safe way (utilizing a temp file)
with open(f"{self.smart_api_cache_path}.tmp", "wb") as smart_api_cache_temp:
pickle.dump(smart_api_cache_contents, smart_api_cache_temp)
os.rename(f"{self.smart_api_cache_path}.tmp",
self.smart_api_cache_path)

else:
eprint(f"Keeping pre-existing SmartAPI cache since we got no results back from SmartAPI")
with open(self.smart_api_cache_path, "rb") as smart_api_file:
smart_api_cache_contents = pickle.load(smart_api_file)
with open(self.smart_api_and_meta_map_cache, "rb") as cache_file:
smart_api_cache_contents = pickle.load(cache_file)['smart_api_cache']

# Grab KPs' meta map info based off of their /meta_knowledge_graph endpoints
meta_map = self._build_meta_map(allowed_kps_dict=smart_api_cache_contents["allowed_kp_urls"])

# Save the meta map to the proper cache file in a thread-safe way (utilizing a temp file)
with open(f"{self.meta_map_cache_path}.tmp", "wb") as meta_map_cache_temp:
pickle.dump(meta_map, meta_map_cache_temp)
os.rename(f"{self.meta_map_cache_path}.tmp",
self.meta_map_cache_path)

common_cache = {
"smart_api_cache": smart_api_cache_contents,
"meta_map_cache": meta_map
}

with open(f"{self.smart_api_and_meta_map_cache}.tmp", "wb") as smart_api__and_meta_map_cache_temp:
pickle.dump(common_cache, smart_api__and_meta_map_cache_temp)
os.rename(f"{self.smart_api_and_meta_map_cache}.tmp",
self.smart_api_and_meta_map_cache)
eprint(f"The process with process ID {current_pid} has FINISHED refreshing the KP info caches")

except Exception as e:
Expand Down Expand Up @@ -121,9 +122,8 @@ def _get_kp_url_from_smartapi_registration(self, kp_smart_api_registration: dict
else:
return None

def both_caches_are_present(self):
return os.path.exists(self.smart_api_cache_path) and \
os.path.exists(self.meta_map_cache_path)
def cache_file_present(self):
return os.path.exists(self.smart_api_and_meta_map_cache)

def load_kp_info_caches(self, log: ARAXResponse):
"""
Expand All @@ -134,83 +134,30 @@ def load_kp_info_caches(self, log: ARAXResponse):
become corrupted while refreshing.
"""

refresher_pid_exists = os.path.exists(self.cache_refresh_pid_path)

if refresher_pid_exists:
# KP info cache is probably being regenerated
log.info("the KP info cache is being refreshed; waiting for it")
# if either pickled cache file is missing, then check if they are
# being generated (on the other hand, if both exist, just move on
# since we will use the cache files); see RTX issue 2072
# Check if the refresher PID file exists
try:
with open(self.cache_refresh_pid_path, "r") as f:
refresher_pid = int(f.read())
assert refresher_pid != os.getpid()
# Get the PID of the process that is currently refreshing
# the caches
# Check if the process is still running
iter_ctr = 0
while True:
# if the caches are being actively refreshed, wait for
# it to finish
time.sleep(WAIT_LOOP_SEC)
iter_ctr += 1

refresher_is_running = psutil.pid_exists(refresher_pid)
cache_files_present = self.both_caches_are_present()
caches_being_refreshed = refresher_is_running and \
(refresher_pid_exists or (not cache_files_present))

if not caches_being_refreshed:
if not refresher_is_running and \
refresher_pid_exists:
log.warning("Removing KP info cache refresher "
"PID file")
os.unlink(self.cache_refresh_pid_path)

if cache_files_present:
# the cache files have been updated
break
else:
raise Exception("kp_info_cacher file(s) are missing")

if WAIT_LOOP_SEC * iter_ctr > \
MAX_TOTAL_WAIT_FOR_CACHE_SEC:
raise Exception("Timed out waiting for SmartAPI " +
"cache creation; perhaps " +
"MAX_TOTAL_WAIT_FOR_CACHE_SEC " +
"value was too small: " +
f"{MAX_TOTAL_WAIT_FOR_CACHE_SEC}")

except FileNotFoundError as e:
# handle case where cache is already refreshed before we started the loop
if not self.both_caches_are_present():
raise Exception("cache refresher PID file missing but cache file missing")


# At this point the KP info caches must NOT be in the process of being
# refreshed, so we create/update if needed. In particular, this ensures
# that the caches will be created/fresh even on dev machines, that don't
# run the background refresh task.
one_day_ago = datetime.now() - timedelta(hours=24)
smart_api_info_cache_pathlib_path = pathlib.Path(self.smart_api_cache_path)
meta_map_cache_pathlib_path = pathlib.Path(self.meta_map_cache_path)
if not smart_api_info_cache_pathlib_path.exists() or not meta_map_cache_pathlib_path.exists():
log.warning(f"Missing KP info cache(s). Creating now.")
self.refresh_kp_info_caches()
elif (datetime.fromtimestamp(smart_api_info_cache_pathlib_path.stat().st_mtime) < one_day_ago or
datetime.fromtimestamp(meta_map_cache_pathlib_path.stat().st_mtime) < one_day_ago):
log.info(f"KP info cache(s) have not been updated for 24+ hours. Refreshing stale cache(s).")
self.refresh_kp_info_caches()

smart_api_and_meta_map_pathlib_path = pathlib.Path(self.smart_api_and_meta_map_cache)
try:
if not smart_api_and_meta_map_pathlib_path.exists():
raise Exception("KP info cache(s) do not exist.")
elif (datetime.fromtimestamp(smart_api_and_meta_map_pathlib_path.stat().st_mtime) < one_day_ago):
raise Exception("KP info cache(s) are older than 24 hours.")

except Exception as e:
log.error(f"Unable to load KP info caches: {e}")
raise e

# The caches MUST be up to date at this point, so we just load them
log.debug(f"Loading cached Smart API info")
with open(self.smart_api_cache_path, "rb") as smart_api_file:
smart_api_info = pickle.load(smart_api_file)
log.debug(f"Loading cached meta map")
with open(self.meta_map_cache_path, "rb") as map_file:
meta_map = pickle.load(map_file)
log.debug(f"Loading cached Smart API amd meta map info")
with open(self.smart_api_and_meta_map_cache, "rb") as cache:
cache = pickle.load(cache)
smart_api_info = cache['smart_api_cache']
meta_map = cache['meta_map_cache']


return smart_api_info, meta_map

Expand All @@ -219,10 +166,10 @@ def load_kp_info_caches(self, log: ARAXResponse):

def _build_meta_map(self, allowed_kps_dict: Dict[str, str]):
# Start with whatever pre-existing meta map we might already have (can use this info in case an API fails)
meta_map_file = pathlib.Path(self.meta_map_cache_path)
if meta_map_file.exists():
with open(self.meta_map_cache_path, "rb") as existing_meta_map_file:
meta_map = pickle.load(existing_meta_map_file)
cache_file = pathlib.Path(self.smart_api_and_meta_map_cache )
if cache_file.exists():
with open(self.smart_api_and_meta_map_cache, "rb") as cache:
meta_map = pickle.load(cache)['meta_map_cache']
else:
meta_map = dict()

Expand Down
Loading