Skip to content

Commit

Permalink
feat: Update to job-runner v2.74.0
Browse files Browse the repository at this point in the history
  • Loading branch information
rebkwok committed May 7, 2024
1 parent a9e4cd6 commit fb5b21c
Show file tree
Hide file tree
Showing 24 changed files with 335 additions and 93 deletions.
2 changes: 1 addition & 1 deletion opensafely/_vendor/chardet-3.0.4.dist-info/RECORD
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
../../bin/chardetect,sha256=rs8ryhcO6KK7HEbwaqQXMyJbs-rh0b3z7StE9WY6u1s,255
../../bin/chardetect,sha256=Iim50Xo_A-6jNKXhng0od1YWXEr5OEzrv2MBb1TfLes,256
chardet-3.0.4.dist-info/DESCRIPTION.rst,sha256=PQ4sBsMyKFZkjC6QpmbpLn0UtCNyeb-ZqvCGEgyZMGk,2174
chardet-3.0.4.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
chardet-3.0.4.dist-info/METADATA,sha256=RV_2I4B1Z586DL8oVO5Kp7X5bUdQ5EuKAvNoAEF8wSw,3239
Expand Down
2 changes: 1 addition & 1 deletion opensafely/_vendor/distro-1.8.0.dist-info/RECORD
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
../../bin/distro,sha256=29a3n43B_mVjn_fQOgyvUvaVNz2-44LC06SGtEopzvU,246
../../bin/distro,sha256=LL7TkGdbIp5yO7jgBe9K5tlwiCJXMlG9hKhj5qmlETo,247
distro-1.8.0.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
distro-1.8.0.dist-info/LICENSE,sha256=y16Ofl9KOYjhBjwULGDcLfdWBfTEZRXnduOspt-XbhQ,11325
distro-1.8.0.dist-info/METADATA,sha256=NhYw94UPXb78_Z3_VtLxTJ1zQgUUKoTndg10uKJX800,6915
Expand Down
94 changes: 94 additions & 0 deletions opensafely/_vendor/jobrunner/cli/manifests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""
Ops utility for backfilling manifest.json files from db
"""
import argparse

from opensafely._vendor.jobrunner.executors import local
from opensafely._vendor.jobrunner.lib import database
from opensafely._vendor.jobrunner.models import Job


def main(workspaces=None):
conn = database.get_connection()
workspaces = [
w["workspace"] for w in conn.execute("SELECT DISTINCT(workspace) FROM job;")
]

n_workspaces = len(workspaces)
for i, workspace in enumerate(workspaces):
print(f"workspace {i+1}/{n_workspaces}: {workspace}")

level4_dir = local.get_medium_privacy_workspace(workspace)

sentinel = level4_dir / ".manifest-backfill"
if sentinel.exists():
print(" - already done, skipping")
continue

write_manifest(workspace)

sentinel.touch()


def write_manifest(workspace):
conn = database.get_connection()
workspace_dir = local.get_high_privacy_workspace(workspace)
level4_dir = local.get_medium_privacy_workspace(workspace)

# ordering by most recent ensures we find the job that generated the
# current version of the file.
job_ids = conn.execute(
"""
SELECT id FROM job
WHERE workspace = ?
AND outputs != ''
AND completed_at IS NOT NULL
AND state = 'succeeded'
ORDER BY completed_at DESC;
""",
(workspace,),
)

outputs = {}

for row in job_ids:
job_id = row["id"]
job = database.find_one(Job, id=job_id)

for output, level in job.outputs.items():
if output in outputs:
# older version of the file, ignore
continue

abspath = workspace_dir / output

# use presence of message file to detect excluded files
message_file = level4_dir / (output + ".txt")
excluded = message_file.exists()

metadata = local.get_output_metadata(
abspath,
level,
job_id=job_id,
job_request=job.job_request_id,
action=job.action,
commit=job.commit,
excluded=excluded,
)

outputs[output] = metadata

manifest = local.read_manifest_file(level4_dir, workspace)
manifest["outputs"] = outputs
print(f" - writing manifest for {workspace} with {len(outputs)} outputs")
local.write_manifest_file(level4_dir, manifest)


def run():
parser = argparse.ArgumentParser(description=__doc__.partition("\n\n")[0])
args = parser.parse_args()
main(**vars(args))


if __name__ == "__main__":
run()
1 change: 1 addition & 0 deletions opensafely/_vendor/jobrunner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def database_urls_from_env(env):
os.environ.get("LEVEL4_MAX_FILESIZE", 16 * 1024 * 1024)
) # 16mb

LEVEL4_MAX_CSV_ROWS = int(os.environ.get("LEVEL4_MAX_CSV_ROWS", 5000))

LEVEL4_FILE_TYPES = pipeline.constants.LEVEL4_FILE_TYPES

Expand Down
176 changes: 136 additions & 40 deletions opensafely/_vendor/jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
JobStatus,
Privacy,
)
from opensafely._vendor.jobrunner.lib import datestr_to_ns_timestamp, docker
from opensafely._vendor.jobrunner.lib import datestr_to_ns_timestamp, docker, file_digest
from opensafely._vendor.jobrunner.lib.git import checkout_commit
from opensafely._vendor.jobrunner.lib.path_utils import list_dir_with_ignore_patterns
from opensafely._vendor.jobrunner.lib.string_utils import tabulate
Expand Down Expand Up @@ -493,53 +493,104 @@ def persist_outputs(job_definition, outputs, job_metadata):
# Extract outputs to workspace
workspace_dir = get_high_privacy_workspace(job_definition.workspace)

excluded_files = {}
excluded_job_msgs = {}
excluded_file_msgs = {}

sizes = {}
for filename in outputs.keys():
# copy all files into workspace long term storage
for filename, level in outputs.items():
log.info(f"Extracting output file: {filename}")
size = volumes.get_volume_api(job_definition).copy_from_volume(
job_definition, filename, workspace_dir / filename
dst = workspace_dir / filename
sizes[filename] = volumes.get_volume_api(job_definition).copy_from_volume(
job_definition, filename, dst
)
sizes[filename] = size

# Copy out medium privacy files
l4_files = [
filename
for filename, level in outputs.items()
if level == "moderately_sensitive"
]

csv_metadata = {}
# check any L4 files are vaild
for filename in l4_files:
ok, job_msg, file_msg, csv_counts = check_l4_file(
job_definition, filename, sizes[filename], workspace_dir
)
if not ok:
excluded_job_msgs[filename] = job_msg
excluded_file_msgs[filename] = file_msg
csv_metadata[filename] = csv_counts
medium_privacy_dir = get_medium_privacy_workspace(job_definition.workspace)

for filename, privacy_level in outputs.items():
if privacy_level == "moderately_sensitive":
ok, job_msg, file_msg = check_l4_file(
job_definition, filename, sizes[filename], workspace_dir
)
# local run currently does not have a level 4 directory, so exit early
if not medium_privacy_dir:
return excluded_job_msgs

if not ok:
excluded_files[filename] = job_msg
# Copy out medium privacy files to L4
for filename in l4_files:
src = workspace_dir / filename
dst = medium_privacy_dir / filename
message_file = medium_privacy_dir / (filename + ".txt")

# local run currently does not have a level 4 directory
if medium_privacy_dir:
message_file = medium_privacy_dir / (filename + ".txt")
if filename in excluded_file_msgs:
message_file.parent.mkdir(exist_ok=True, parents=True)
message_file.write_text(excluded_file_msgs[filename])
else:
volumes.copy_file(src, dst)
# if it previously had a message, delete it
delete_files_from_directory(medium_privacy_dir, [message_file])

# Update manifest with file metdata
manifest = read_manifest_file(medium_privacy_dir, job_definition.workspace)

for filename, level in outputs.items():
abspath = workspace_dir / filename
manifest["outputs"][filename] = get_output_metadata(
abspath,
level,
job_id=job_definition.id,
job_request=job_definition.job_request_id,
action=job_definition.action,
commit=job_definition.study.commit,
excluded=filename in excluded_file_msgs,
message=excluded_job_msgs.get(filename),
csv_counts=csv_metadata.get(filename),
)
write_manifest_file(medium_privacy_dir, manifest)

return excluded_job_msgs

if ok:
volumes.copy_file(
workspace_dir / filename, medium_privacy_dir / filename
)
# if it previously had a too big notice, delete it
delete_files_from_directory(medium_privacy_dir, [message_file])
else:
message_file.parent.mkdir(exist_ok=True, parents=True)
message_file.write_text(file_msg)

# this can be removed once osrelease is dead
write_manifest_file(
medium_privacy_dir,
{
# this currently needs to exist, but is not used
"repo": None,
"workspace": job_definition.workspace,
},
)

return excluded_files
def get_output_metadata(
abspath,
level,
job_id,
job_request,
action,
commit,
excluded,
message=None,
csv_counts=None,
):
stat = abspath.stat()
with abspath.open("rb") as fp:
content_hash = file_digest(fp, "sha256").hexdigest()
csv_counts = csv_counts or {}
return {
"level": level,
"job_id": job_id,
"job_request": job_request,
"action": action,
"commit": commit,
"size": stat.st_size,
"timestamp": stat.st_mtime,
"content_hash": content_hash,
"excluded": excluded,
"message": message,
"row_count": csv_counts.get("rows"),
"col_count": csv_counts.get("cols"),
}


MAX_SIZE_MSG = """
Expand All @@ -564,7 +615,7 @@ def persist_outputs(job_definition, outputs, job_metadata):
Level 4 files should be aggregate information easily viewable by output checkers.
See available list of file types here: https://docs.opensafely.org/releasing-files/#allowed-file-types
See available list of file types here: https://docs.opensafely.org/requesting-file-release/#allowed-file-types.
"""

PATIENT_ID = """
Expand All @@ -582,13 +633,26 @@ def persist_outputs(job_definition, outputs, job_metadata):
"""

MAX_CSV_ROWS_MSG = """
The file:
{filename}
contained {row_count} rows, which is above the limit for moderately_sensitive files of
{limit} rows.
As such, it has *not* been copied to Level 4 storage. Please contact tech-support for
further assistance.
"""


def check_l4_file(job_definition, filename, size, workspace_dir):
def mb(b):
return round(b / (1024 * 1024), 2)

job_msgs = []
file_msgs = []
csv_counts = {"rows": None, "cols": None}

suffix = Path(filename).suffix
if suffix not in config.LEVEL4_FILE_TYPES:
Expand All @@ -603,12 +667,29 @@ def mb(b):
with actual_file.open() as f:
reader = csv.DictReader(f)
headers = reader.fieldnames
first_row = next(reader, None)
if first_row:
csv_counts["cols"] = len(first_row)
csv_counts["rows"] = sum(1 for _ in reader) + 1
else:
csv_counts["cols"] = csv_counts["rows"] = 0
except Exception:
pass
else:
if headers and "patient_id" in headers:
job_msgs.append("File has patient_id column")
file_msgs.append(PATIENT_ID.format(filename=filename))
if csv_counts["rows"] > job_definition.level4_max_csv_rows:
job_msgs.append(
f"File row count ({csv_counts['rows']}) exceeds maximum allowed rows ({job_definition.level4_max_csv_rows})"
)
file_msgs.append(
MAX_CSV_ROWS_MSG.format(
filename=filename,
row_count=csv_counts["rows"],
limit=job_definition.level4_max_csv_rows,
)
)

if size > job_definition.level4_max_filesize:
job_msgs.append(
Expand All @@ -623,9 +704,9 @@ def mb(b):
)

if job_msgs:
return False, ",".join(job_msgs), "\n\n".join(file_msgs)
return False, ",".join(job_msgs), "\n\n".join(file_msgs), csv_counts
else:
return True, None, None
return True, None, None, csv_counts


def find_matching_outputs(job_definition):
Expand Down Expand Up @@ -804,6 +885,21 @@ def redact_environment_variables(container_metadata):
container_metadata["Config"]["Env"] = redacted_vars


def read_manifest_file(workspace_dir, workspace):
manifest_file = workspace_dir / METADATA_DIR / MANIFEST_FILE

if manifest_file.exists():
manifest = json.loads(manifest_file.read_text())
manifest.setdefault("outputs", {})
return manifest

return {
"workspace": workspace,
"repo": None, # old key, no longer needed
"outputs": {},
}


def write_manifest_file(workspace_dir, manifest):
manifest_file = workspace_dir / METADATA_DIR / MANIFEST_FILE
manifest_file.parent.mkdir(exist_ok=True, parents=True)
Expand Down
3 changes: 2 additions & 1 deletion opensafely/_vendor/jobrunner/job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ class JobDefinition:
str, str
] # the files that the job should produce (globs mapped to privacy levels)
allow_database_access: bool # whether this job should have access to the database
level4_max_csv_rows: int
level4_max_filesize: int
# our internal name for the database this job uses (actual connection details are
# passed in `env`)
database_name: str = None
cpu_count: str = None # number of CPUs to be allocated
memory_limit: str = None # memory limit to apply
level4_max_filesize: int = 16 * 1024 * 1024
level4_file_types: list = field(default_factory=lambda: [".csv"])
# if a job has been cancelled, the name of the canceller - either "user" or "admin"
cancelled: str = None
Expand Down
Loading

0 comments on commit fb5b21c

Please sign in to comment.