Skip to content

Commit

Permalink
Merge pull request #617 from opensafely-core/evansd/job-scheduling
Browse files Browse the repository at this point in the history
Use (hopefully) fairer job processing order
  • Loading branch information
evansd authored Jul 14, 2023
2 parents 67af811 + eccbf0e commit 9acc49a
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 22 deletions.
2 changes: 0 additions & 2 deletions jobrunner/cli/local_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,6 @@ def create_and_run_jobs(
):
# Fiddle with the configuration to suit what we need for running local jobs
docker.LABEL = docker_label
# It's more helpful in this context to have things consistent
config.RANDOMISE_JOB_ORDER = False
config.HIGH_PRIVACY_WORKSPACES_DIR = project_dir.parent
config.DATABASE_FILE = project_dir / "metadata" / "db.sqlite"
config.TMP_DIR = temp_dir
Expand Down
6 changes: 0 additions & 6 deletions jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,6 @@ def _is_valid_backend_name(name):
MAX_DB_WORKERS = int(os.environ.get("MAX_DB_WORKERS") or MAX_WORKERS)
MAX_RETRIES = int(os.environ.get("MAX_RETRIES", 0))

# This is a crude mechanism for preventing a single large JobRequest with lots
# of associated Jobs from hogging all the resources. We want this configurable
# because it's useful to be able to disable this during tests and when running
# locally
RANDOMISE_JOB_ORDER = True


STATA_LICENSE = os.environ.get("STATA_LICENSE")
STATA_LICENSE_REPO = os.environ.get(
Expand Down
41 changes: 31 additions & 10 deletions jobrunner/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
the appropriate action for each job depending on its current state, and then
updates its state as appropriate.
"""
import collections
import datetime
import logging
import os
import random
import sys
import time
from typing import Optional
Expand Down Expand Up @@ -66,19 +66,40 @@ def handle_jobs(api: Optional[ExecutorAPI]):
log.debug("Querying database for active jobs")
active_jobs = find_where(Job, state__in=[State.PENDING, State.RUNNING])
log.debug("Done query")
# Randomising the job order is a crude but effective way to ensure that a
# single large job request doesn't hog all the workers. We make this
# optional as, when running locally, having jobs run in a predictable order
# is preferable
if config.RANDOMISE_JOB_ORDER:
random.shuffle(active_jobs)

for job in active_jobs:

running_for_workspace = collections.defaultdict(int)
handled_jobs = []

while active_jobs:
# We need to re-sort on each loop because the number of running jobs per
# workspace will change as we work our way through
active_jobs.sort(
key=lambda job: (
# Process all running jobs first. Once we've processed all of these, the
# counts in `running_for_workspace` will be up-to-date.
0 if job.state == State.RUNNING else 1,
# Then process PENDING jobs in order of how many are running in the
# workspace. This gives a fairer allocation of capacity among
# workspaces.
running_for_workspace[job.workspace],
# Finally use job age as a tie-breaker
job.created_at,
)
)
job = active_jobs.pop(0)

# `set_log_context` ensures that all log messages triggered anywhere
# further down the stack will have `job` set on them
with set_log_context(job=job):
handle_single_job(job, api)
return active_jobs

# Add running jobs to the workspace count
if job.state == State.RUNNING:
running_for_workspace[job.workspace] += 1

handled_jobs.append(job)

return handled_jobs


# we do not control the transition from these states, the executor does
Expand Down
4 changes: 0 additions & 4 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ def test_integration_with_cohortextractor(
)
# Disable repo URL checking so we can run using a local test repo
monkeypatch.setattr("jobrunner.config.ALLOWED_GITHUB_ORGS", None)
# Make job execution order deterministic
monkeypatch.setattr("jobrunner.config.RANDOMISE_JOB_ORDER", False)

if extraction_tool == "cohortextractor":
image = "cohortextractor"
Expand Down Expand Up @@ -234,8 +232,6 @@ def test_integration_with_databuilder(
)
# Disable repo URL checking so we can run using a local test repo
monkeypatch.setattr("jobrunner.config.ALLOWED_GITHUB_ORGS", None)
# Make job execution order deterministic
monkeypatch.setattr("jobrunner.config.RANDOMISE_JOB_ORDER", False)

ensure_docker_images_present("databuilder:v0.36.0", "python")

Expand Down

0 comments on commit 9acc49a

Please sign in to comment.