diff --git a/docker_scripts/parallelrunner.py b/docker_scripts/parallelrunner.py index cadfd6f..406e66c 100755 --- a/docker_scripts/parallelrunner.py +++ b/docker_scripts/parallelrunner.py @@ -111,7 +111,7 @@ def start(self): while True: tools.wait_for_path(self.input_tasks_path) - input_dict = json.loads(self.input_tasks_path.read_text()) + input_dict = tools.load_json(self.input_tasks_path) command = input_dict["command"] caller_uuid = input_dict["caller_uuid"] map_uuid = input_dict["map_uuid"] @@ -303,8 +303,8 @@ def process_job_outputs(self, results, batch, status): zip_file, at=output[probe_name]["filename"], ) - file_results = json.loads( - file_results_path.read_text() + file_results = tools.load_json( + file_results_path ) output[probe_name]["value"] = file_results @@ -372,8 +372,8 @@ def transform_batch_to_task_input(self, batch): def jobs_file_write_new(self, id, name, description, status): with self.lock: - jobs_statuses = json.loads( - self.settings.jobs_status_path.read_text() + jobs_statuses = tools.load_json( + self.settings.jobs_status_path ) jobs_statuses[id] = { "name": name, @@ -386,8 +386,8 @@ def jobs_file_write_new(self, id, name, description, status): def jobs_file_write_status_change(self, id, status): with self.lock: - jobs_statuses = json.loads( - self.settings.jobs_status_path.read_text() + jobs_statuses = tools.load_json( + self.settings.jobs_status_path ) jobs_statuses[id]["status"] = status self.settings.jobs_status_path.write_text( @@ -509,7 +509,7 @@ def teardown(self): def read_keyvalues(self): """Read keyvalues file""" - keyvalues_unprocessed = json.loads(self.keyvalues_path.read_text()) + keyvalues_unprocessed = tools.load_json(self.keyvalues_path.read_text()) self.keyvalues_path.unlink() keyvalues = {} diff --git a/docker_scripts/tools.py b/docker_scripts/tools.py index 582f6b3..42b8396 100755 --- a/docker_scripts/tools.py +++ b/docker_scripts/tools.py @@ -1,8 +1,18 @@ +import json +import logging import pathlib as pl +import time from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer +DEFAULT_FILE_POLLING_INTERVAL = 5 + +logging.basicConfig( + level=logging.INFO, format="[%(filename)s:%(lineno)d] %(message)s" +) +logger = logging.getLogger(__name__) + def wait_for_path(file_path): path = pl.Path(file_path).resolve() @@ -41,3 +51,18 @@ def on_created(self, event): finally: observer.stop() observer.join() + +def load_json(file_path: pl.Path, wait=True, + file_polling_interval=DEFAULT_FILE_POLLING_INTERVAL): + if wait: + wait_for_path(file_path) + + while True: + try: + content = json.loads(file_path.read_text()) + break + except json.decoder.JSONDecodeError: + logger.info(f"JSON read error, retrying read from {file_path}") + time.sleep(file_polling_interval) + + return content