Skip to content

Commit

Permalink
Retry json read if it fails, working around osparc io limitations (#10)
Browse files Browse the repository at this point in the history
* Retry json read if it fails, working around osparc io limitations

* Fix validation client
  • Loading branch information
wvangeit authored Aug 22, 2024
1 parent 6c6b713 commit 8d40b2a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
16 changes: 8 additions & 8 deletions docker_scripts/parallelrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def start(self):
while True:
tools.wait_for_path(self.input_tasks_path)

input_dict = json.loads(self.input_tasks_path.read_text())
input_dict = tools.load_json(self.input_tasks_path)
command = input_dict["command"]
caller_uuid = input_dict["caller_uuid"]
map_uuid = input_dict["map_uuid"]
Expand Down Expand Up @@ -303,8 +303,8 @@ def process_job_outputs(self, results, batch, status):
zip_file,
at=output[probe_name]["filename"],
)
file_results = json.loads(
file_results_path.read_text()
file_results = tools.load_json(
file_results_path
)

output[probe_name]["value"] = file_results
Expand Down Expand Up @@ -372,8 +372,8 @@ def transform_batch_to_task_input(self, batch):

def jobs_file_write_new(self, id, name, description, status):
with self.lock:
jobs_statuses = json.loads(
self.settings.jobs_status_path.read_text()
jobs_statuses = tools.load_json(
self.settings.jobs_status_path
)
jobs_statuses[id] = {
"name": name,
Expand All @@ -386,8 +386,8 @@ def jobs_file_write_new(self, id, name, description, status):

def jobs_file_write_status_change(self, id, status):
with self.lock:
jobs_statuses = json.loads(
self.settings.jobs_status_path.read_text()
jobs_statuses = tools.load_json(
self.settings.jobs_status_path
)
jobs_statuses[id]["status"] = status
self.settings.jobs_status_path.write_text(
Expand Down Expand Up @@ -509,7 +509,7 @@ def teardown(self):
def read_keyvalues(self):
"""Read keyvalues file"""

keyvalues_unprocessed = json.loads(self.keyvalues_path.read_text())
keyvalues_unprocessed = tools.load_json(self.keyvalues_path.read_text())
self.keyvalues_path.unlink()

keyvalues = {}
Expand Down
25 changes: 25 additions & 0 deletions docker_scripts/tools.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import json
import logging
import pathlib as pl
import time

from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

DEFAULT_FILE_POLLING_INTERVAL = 5

logging.basicConfig(
level=logging.INFO, format="[%(filename)s:%(lineno)d] %(message)s"
)
logger = logging.getLogger(__name__)


def wait_for_path(file_path):
path = pl.Path(file_path).resolve()
Expand Down Expand Up @@ -41,3 +51,18 @@ def on_created(self, event):
finally:
observer.stop()
observer.join()

def load_json(file_path: pl.Path, wait=True,
file_polling_interval=DEFAULT_FILE_POLLING_INTERVAL):
if wait:
wait_for_path(file_path)

while True:
try:
content = json.loads(file_path.read_text())
break
except json.decoder.JSONDecodeError:
logger.info(f"JSON read error, retrying read from {file_path}")
time.sleep(file_polling_interval)

return content

0 comments on commit 8d40b2a

Please sign in to comment.