Skip to content

Commit

Permalink
Merge pull request #8 from ITISFoundation/add_job_timeout
Browse files Browse the repository at this point in the history
Add job timeout
  • Loading branch information
wvangeit authored Jul 30, 2024
2 parents eb88afb + 68be504 commit a6f1071
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.1.20
current_version = 0.2.1
commit = False
message = service version: {current_version} → {new_version}
tag = False
Expand Down
7 changes: 6 additions & 1 deletion .osparc/osparc-meta-parallelrunner/metadata.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: ParallelRunner
description: "ParallelRunnerService"
key: simcore/services/dynamic/osparc-meta-parallelrunner
version: 0.1.20
version: 0.2.1
integration-version: 2.0.0
type: dynamic
authors:
Expand Down Expand Up @@ -40,6 +40,11 @@ outputs:
label: Output values
description: Output files uploaded from the outputs folder
type: data:*/*
conf_json_schema:
displayOrder: 2.0
label: JSON schema
description: JSON schema of configuration file
type: data:*/*
boot-options:
boot_mode:
label: Boot mode
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ SHELL = /bin/sh
MAKEFLAGS += -j2

export DOCKER_IMAGE_NAME ?= osparc-meta-parallelrunner
export DOCKER_IMAGE_TAG ?= 0.1.20
export DOCKER_IMAGE_TAG ?= 0.2.1

export MASTER_AWS_REGISTRY ?= registry.osparc-master-zmt.click
export MASTER_REGISTRY ?= registry.osparc-master.speag.com
Expand Down Expand Up @@ -43,7 +43,7 @@ build: clean compose-spec ## build docker image
docker compose build

validation-clean:
sudo rm -rf validation-tmp
rm -rf validation-tmp
cp -r validation validation-tmp
chmod -R 770 validation-tmp

Expand Down
2 changes: 1 addition & 1 deletion docker-compose-local.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
osparc-meta-parallelrunner:
image: simcore/services/dynamic/osparc-meta-parallelrunner:0.1.20
image: simcore/services/dynamic/osparc-meta-parallelrunner:0.2.1
ports:
- "8888:8888"
environment:
Expand Down
17 changes: 14 additions & 3 deletions docker_scripts/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import http.server
import json
import logging
import pathlib as pl
import socketserver
import threading
import time
import typing

import pydantic as pyda
import pydantic_settings
Expand All @@ -18,18 +18,28 @@

HTTP_PORT = 8888
INPUT_CONF_KEY = "input_3"
CONF_SCHEMA_KEY = "conf_json_schema"

FILE_POLLING_INTERVAL = 1 # second

MAX_JOB_CREATE_ATTEMPTS = 5
JOB_CREATE_ATTEMPTS_DELAY = 5
MAX_JOB_TRIALS = 5

JOB_TIMEOUT = None


def main():
"""Main"""

settings = MainSettings()
settings = ParallelRunnerMainSettings()
settings_schema = settings.model_json_schema()
logger.info(settings_schema)
conf_json_schema_path = (
settings.output_path / CONF_SCHEMA_KEY / "schema.json"
)
conf_json_schema_path.write_text(json.dumps(settings_schema, indent=2))

config_path = settings.input_path / INPUT_CONF_KEY / "parallelrunner.json"

http_dir_path = pl.Path(__file__).parent / "http"
Expand Down Expand Up @@ -71,7 +81,7 @@ def __init__(self, *args, **kwargs):
logger.error(f"{err} . Stopping %s", exc_info=True)


class MainSettings(pydantic_settings.BaseSettings):
class ParallelRunnerMainSettings(pydantic_settings.BaseSettings):
batch_mode: bool = False
file_polling_interval: int = FILE_POLLING_INTERVAL
input_path: pyda.DirectoryPath = pyda.Field(alias="DY_SIDECAR_PATH_INPUTS")
Expand All @@ -82,6 +92,7 @@ class MainSettings(pydantic_settings.BaseSettings):
file_polling_interval: int = FILE_POLLING_INTERVAL
max_job_create_attempts: int = MAX_JOB_CREATE_ATTEMPTS
job_create_attempts_delay: int = JOB_CREATE_ATTEMPTS_DELAY
job_timeout: None | float = JOB_TIMEOUT


if __name__ == "__main__":
Expand Down
15 changes: 13 additions & 2 deletions docker_scripts/parallelrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(
max_job_trials=None,
max_job_create_attempts=None,
job_create_attempts_delay=None,
job_timeout=None,
):
"""Constructor"""
self.test_mode = False
Expand All @@ -48,6 +49,7 @@ def __init__(
self.max_job_trials = max_job_trials
self.max_job_create_attempts = max_job_create_attempts
self.job_create_attempts_delay = job_create_attempts_delay
self.job_timeout = job_timeout

self.input_path = input_path # path where osparc write all our input
self.output_path = output_path # path where osparc write all our input
Expand Down Expand Up @@ -291,13 +293,13 @@ def run_job(self, job_inputs, input_batch):
"""Run a job with given inputs"""

logger.debug(f"Sending inputs: {job_inputs}")

if self.test_mode:
logger.info("Map in test mode, just returning input")

done_batch = self.process_job_outputs(
job_inputs, input_batch, "SUCCESS"
)
time.sleep(1)

return done_batch

Expand Down Expand Up @@ -443,7 +445,16 @@ def map_func(batch, trial_number=1):

job_inputs = self.create_job_inputs(task_input)

output_batch = self.run_job(job_inputs, batch)
with pathos.pools.ThreadPool(nodes=1) as timeout_pool:
output_batch_waiter = timeout_pool.apipe(
self.run_job, job_inputs, batch
)
output_batch = output_batch_waiter.get(
timeout=self.job_timeout
)
timeout_pool.close()
timeout_pool.join()
timeout_pool.clear() # Pool is singleton, need to clear old pool

self.n_of_finished_batches += 1
logger.info(
Expand Down
3 changes: 2 additions & 1 deletion validation/inputs/input_3/parallelrunner.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"batch_mode": true
"batch_mode": true,
"job_timeout": 60
}
Empty file.

0 comments on commit a6f1071

Please sign in to comment.