Skip to content

Commit

Permalink
Fix global eta (#13)
Browse files Browse the repository at this point in the history
* Fix global ETA in dashboard

* Bump version
  • Loading branch information
wvangeit authored Oct 25, 2024
1 parent afcfb73 commit 0594c67
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.1.8
current_version = 1.1.9
commit = False
message = service version: {current_version} → {new_version}
tag = False
Expand Down
2 changes: 1 addition & 1 deletion .osparc/osparc-meta-parallelrunner/metadata.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: ParallelRunner
description: "ParallelRunnerService"
key: simcore/services/dynamic/osparc-meta-parallelrunner
version: 1.1.8
version: 1.1.9
integration-version: 2.0.0
type: dynamic
authors:
Expand Down
6 changes: 4 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ RUN npm install -D tailwindcss@latest postcss@latest autoprefixer@latest
RUN npx tailwindcss init -p

WORKDIR /docker/http/server
RUN chown osparcuser:osparcuser jobs.json
RUN chown osparcuser:osparcuser jobs_settings.json
RUN chown osparcuser:osparcuser jobs_status.json

RUN npm install express
RUN npm install cors
Expand All @@ -40,6 +41,7 @@ RUN . ./venv/bin/activate && pip3 install -r /docker/requirements.txt

USER root
EXPOSE 8888
ENV JOBS_STATUS_PATH=/docker/http/server/jobs.json
ENV JOBS_SETTINGS_PATH=/docker/http/server/jobs_settings.json
ENV JOBS_STATUS_PATH=/docker/http/server/jobs_status.json

ENTRYPOINT [ "/bin/bash", "-c", "/docker/entrypoint.bash" ]
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ SHELL = /bin/sh
MAKEFLAGS += -j2

export DOCKER_IMAGE_NAME ?= osparc-meta-parallelrunner
export DOCKER_IMAGE_TAG ?= 1.1.8
export DOCKER_IMAGE_TAG ?= 1.1.9

export MASTER_AWS_REGISTRY ?= registry.osparc-master-zmt.click
export MASTER_REGISTRY ?= registry.osparc-master.speag.com
Expand Down
2 changes: 1 addition & 1 deletion docker-compose-local.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
osparc-meta-parallelrunner:
image: simcore/services/dynamic/osparc-meta-parallelrunner:1.1.8
image: simcore/services/dynamic/osparc-meta-parallelrunner:1.1.9
ports:
- "8888:8888"
environment:
Expand Down
44 changes: 21 additions & 23 deletions docker_scripts/entrypoint.bash
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,35 @@ INFO="INFO: [$(basename "$0")] "

echo "$INFO" "Starting container for parallelrunner ..."

echo "$JOBS_STATUS_PATH"

HOST_USERID=$(stat -c %u "${DY_SIDECAR_PATH_INPUTS}")
HOST_GROUPID=$(stat -c %g "${DY_SIDECAR_PATH_INPUTS}")
CONTAINER_GROUPNAME=$(grep ":${HOST_GROUPID}:" /etc/group | cut -d: -f1 || echo "")

OSPARC_USER='osparcuser'

if [ "$HOST_USERID" -eq 0 ]; then
echo "Warning: Folder mounted owned by root user... adding $OSPARC_USER to root..."
addgroup "$OSPARC_USER" root
echo "Warning: Folder mounted owned by root user... adding $OSPARC_USER to root..."
addgroup "$OSPARC_USER" root
else
echo "Folder mounted owned by user $HOST_USERID:$HOST_GROUPID-'$CONTAINER_GROUPNAME'..."
# take host's credentials in $OSPARC_USER
if [ -z "$CONTAINER_GROUPNAME" ]; then
echo "Creating new group my$OSPARC_USER"
CONTAINER_GROUPNAME=my$OSPARC_USER
addgroup --gid "$HOST_GROUPID" "$CONTAINER_GROUPNAME"
else
echo "group already exists"
fi

echo "adding $OSPARC_USER to group $CONTAINER_GROUPNAME..."
addgroup "$OSPARC_USER" "$CONTAINER_GROUPNAME"

echo "changing owner ship of state directory /home/${OSPARC_USER}/work/workspace"
chown --recursive "$OSPARC_USER" "/home/${OSPARC_USER}/work/workspace"
echo "changing owner ship of state directory ${DY_SIDECAR_PATH_INPUTS}"
chown --recursive "$OSPARC_USER" "${DY_SIDECAR_PATH_INPUTS}"
echo "changing owner ship of state directory ${DY_SIDECAR_PATH_OUTPUTS}"
chown --recursive "$OSPARC_USER" "${DY_SIDECAR_PATH_OUTPUTS}"
echo "Folder mounted owned by user $HOST_USERID:$HOST_GROUPID-'$CONTAINER_GROUPNAME'..."
# take host's credentials in $OSPARC_USER
if [ -z "$CONTAINER_GROUPNAME" ]; then
echo "Creating new group my$OSPARC_USER"
CONTAINER_GROUPNAME=my$OSPARC_USER
addgroup --gid "$HOST_GROUPID" "$CONTAINER_GROUPNAME"
else
echo "group already exists"
fi

echo "adding $OSPARC_USER to group $CONTAINER_GROUPNAME..."
addgroup "$OSPARC_USER" "$CONTAINER_GROUPNAME"

echo "changing owner ship of state directory /home/${OSPARC_USER}/work/workspace"
chown --recursive "$OSPARC_USER" "/home/${OSPARC_USER}/work/workspace"
echo "changing owner ship of state directory ${DY_SIDECAR_PATH_INPUTS}"
chown --recursive "$OSPARC_USER" "${DY_SIDECAR_PATH_INPUTS}"
echo "changing owner ship of state directory ${DY_SIDECAR_PATH_OUTPUTS}"
chown --recursive "$OSPARC_USER" "${DY_SIDECAR_PATH_OUTPUTS}"
fi

exec su-exec "$OSPARC_USER" /docker/main.bash
2 changes: 0 additions & 2 deletions docker_scripts/http/server/jobs.json

This file was deleted.

1 change: 1 addition & 0 deletions docker_scripts/http/server/jobs_settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
1 change: 1 addition & 0 deletions docker_scripts/http/server/jobs_status.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
21 changes: 12 additions & 9 deletions docker_scripts/http/server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ const port = 8888;
app.use(cors());
app.use(express.json());

const dataFile = path.join(__dirname, 'jobs.json');
const jobsstatusFile = path.join(__dirname, 'jobs_status.json');
const jobssettingsFile = path.join(__dirname, 'jobs_settings.json');

const calculateCompletionTime = (job) => {
return job.endTime && job.startTime
Expand Down Expand Up @@ -42,9 +43,11 @@ const calculateRunningTimeAndETA = (job, finishedJobs) => {

app.get('/api/jobs', async (req, res) => {
try {
const data = await fs.readFile(dataFile, 'utf8');
const data = await fs.readFile(jobsstatusFile, 'utf8');
const jobs = JSON.parse(data);


const numberOfWorkers = JSON.parse(await fs.readFile(jobssettingsFile, 'utf8')).number_of_workers;

// First, calculate completion times for finished jobs
const finishedJobs = Object.values(jobs)
.filter(job => job.status === 'done')
Expand All @@ -57,14 +60,14 @@ app.get('/api/jobs', async (req, res) => {
const updatedJobs = Object.entries(jobs).reduce((acc, [id, job]) => {
const updatedJob = calculateRunningTimeAndETA(job, finishedJobs);
acc[id] = updatedJob;

if (updatedJob.status === 'running' && updatedJob.eta !== null) {
totalRemainingTime += updatedJob.eta;
totalRemainingTime += updatedJob.eta / numberOfWorkers;
runningJobsCount++;
} else if (updatedJob.status === 'todo') {
todoJobsCount++;
}

return acc;
}, {});

Expand All @@ -75,7 +78,7 @@ app.get('/api/jobs', async (req, res) => {

// Add estimated time for todo jobs
if (averageCompletionTime !== null) {
totalRemainingTime += todoJobsCount * averageCompletionTime;
totalRemainingTime += todoJobsCount * averageCompletionTime / numberOfWorkers;
}

const overallETA = (runningJobsCount > 0 || todoJobsCount > 0) ? Math.floor(totalRemainingTime) : null;
Expand All @@ -95,7 +98,7 @@ app.put('/api/jobs/:id', async (req, res) => {
const { status } = req.body;

try {
const data = await fs.readFile(dataFile, 'utf8');
const data = await fs.readFile(jobsstatusFile, 'utf8');
const jobs = JSON.parse(data);

if (!jobs[id]) {
Expand All @@ -118,7 +121,7 @@ app.put('/api/jobs/:id', async (req, res) => {

const updatedJob = calculateRunningTimeAndETA(jobs[id], finishedJobs);

await fs.writeFile(dataFile, JSON.stringify(jobs, null, 2));
await fs.writeFile(jobsstatusFile, JSON.stringify(jobs, null, 2));
res.json(updatedJob);
} catch (err) {
console.error('Error updating job:', err);
Expand Down
4 changes: 4 additions & 0 deletions docker_scripts/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self):
# Hide some settings from the user
for field_name in [
"max_number_of_workers",
"JOBS_SETTINGS_PATH",
"JOBS_STATUS_PATH",
"DY_SIDECAR_PATH_INPUTS",
"DY_SIDECAR_PATH_OUTPUTS",
Expand Down Expand Up @@ -115,6 +116,9 @@ class ParallelRunnerMainSettings(pydantic_settings.BaseSettings):
default=DEFAULT_JOB_CREATE_ATTEMPTS_DELAY, gt=0
)
job_timeout: float = pyda.Field(default=DEFAULT_JOB_TIMEOUT, ge=0)
jobs_settings_path: pyda.FilePath = pyda.Field(
alias="JOBS_SETTINGS_PATH"
)
jobs_status_path: pyda.FilePath = pyda.Field(alias="JOBS_STATUS_PATH")


Expand Down
23 changes: 21 additions & 2 deletions docker_scripts/parallelrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ def start(self):

def run_input_tasks(self, input_tasks, tasks_uuid):
number_of_workers = self.settings.number_of_workers
self.jobs_settings_file_write(number_of_workers)

batch_mode = self.settings.batch_mode

if batch_mode:
Expand Down Expand Up @@ -376,16 +378,31 @@ def transform_batch_to_task_input(self, batch):
else:
if param_name in task_input:
raise ParallelRunner.FatalException(
"Can only handle multiple value of FileJSON in "
"one batch, received several "
"Can only handle multiple values of FileJSON in "
"one batch, not other input types, received "
"several values of type "
f"{param_type} for {param_name}"
)
else:
task_input[param_name] = param_input

return task_input

def jobs_settings_file_write(self, number_of_workers):
"""Write json file with number of workers for GUI"""

with self.lock:
jobs_settings = tools.load_json(self.settings.jobs_settings_path)
jobs_settings = {
"number_of_workers": number_of_workers,
}
self.settings.jobs_settings_path.write_text(
json.dumps(jobs_settings)
)

def jobs_file_write_new(self, id, name, description, status):
"""Add new job to job status file for GUI"""

with self.lock:
jobs_statuses = tools.load_json(self.settings.jobs_status_path)
jobs_statuses[id] = {
Expand All @@ -398,6 +415,8 @@ def jobs_file_write_new(self, id, name, description, status):
)

def jobs_file_write_status_change(self, id, status):
"""Update job in job status file for GUI"""

# Javascript current time
current_time = int(
datetime.datetime.now(tz=datetime.timezone.utc).timestamp() * 1000
Expand Down
1 change: 0 additions & 1 deletion validation-client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,5 @@ def main():
}
input_tasks_path.write_text(json.dumps(stop_command))


if __name__ == "__main__":
main()
75 changes: 75 additions & 0 deletions validation-client/input_tasks_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,81 @@
"type": "integer"
}
}
},
{
"input": {
"InputFile1": {
"type": "FileJSON",
"filename": "input.json",
"value": [
0.7119122781302056,
0.29588291546315737,
0.4652890454124229
]
}
},
"output": {
"OutputFile1": {
"type": "FileJSON",
"filename": "output.json"
},
"OutputNumber1": {
"type": "float"
},
"OutputInteger1": {
"type": "integer"
}
}
},
{
"input": {
"InputFile1": {
"type": "FileJSON",
"filename": "input.json",
"value": [
0.1119122781302056,
0.29588291546315737,
0.4652890454124229
]
}
},
"output": {
"OutputFile1": {
"type": "FileJSON",
"filename": "output.json"
},
"OutputNumber1": {
"type": "float"
},
"OutputInteger1": {
"type": "integer"
}
}
},
{
"input": {
"InputFile1": {
"type": "FileJSON",
"filename": "input.json",
"value": [
0.2119122781302056,
0.29588291546315737,
0.4652890454124229
]
}
},
"output": {
"OutputFile1": {
"type": "FileJSON",
"filename": "output.json"
},
"OutputNumber1": {
"type": "float"
},
"OutputInteger1": {
"type": "integer"
}
}
}
]
}
2 changes: 1 addition & 1 deletion validation/inputs/input_1/settings.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"template_id": "TEST_UUID",
"number_of_workers": 2,
"batch_mode": true,
"batch_mode": false,
"job_timeout": 60
}

0 comments on commit 0594c67

Please sign in to comment.