Skip to content

Commit

Permalink
Write output metadata to manifest.json
Browse files Browse the repository at this point in the history
The main motivation for this was allowing airlock to access commit
data for level 4.

However, it was simple to extend this idea to include a) all outputs and
b) much more metadata, like job, action, size, timestamp, content hash,
and whether a file has been excluded from level 4 and why. This data is
useful for airlock, but also potentially for other systems and general
reporting.
  • Loading branch information
bloodearnest committed Apr 24, 2024
1 parent 22f9fd5 commit c29ed94
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 37 deletions.
129 changes: 92 additions & 37 deletions jobrunner/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
JobStatus,
Privacy,
)
from jobrunner.lib import datestr_to_ns_timestamp, docker
from jobrunner.lib import datestr_to_ns_timestamp, docker, file_digest
from jobrunner.lib.git import checkout_commit
from jobrunner.lib.path_utils import list_dir_with_ignore_patterns
from jobrunner.lib.string_utils import tabulate
Expand Down Expand Up @@ -493,53 +493,93 @@ def persist_outputs(job_definition, outputs, job_metadata):
# Extract outputs to workspace
workspace_dir = get_high_privacy_workspace(job_definition.workspace)

excluded_files = {}
excluded_job_msgs = {}
excluded_file_msgs = {}

sizes = {}
for filename in outputs.keys():
# copy all files into workspace long term storage
for filename, level in outputs.items():
log.info(f"Extracting output file: {filename}")
size = volumes.get_volume_api(job_definition).copy_from_volume(
job_definition, filename, workspace_dir / filename
dst = workspace_dir / filename
sizes[filename] = volumes.get_volume_api(job_definition).copy_from_volume(
job_definition, filename, dst
)
sizes[filename] = size

# Copy out medium privacy files
l4_files = [
filename
for filename, level in outputs.items()
if level == "moderately_sensitive"
]

# check any L4 files are vaild
for filename in l4_files:
ok, job_msg, file_msg = check_l4_file(
job_definition, filename, sizes[filename], workspace_dir
)
if not ok:
excluded_job_msgs[filename] = job_msg
excluded_file_msgs[filename] = file_msg

medium_privacy_dir = get_medium_privacy_workspace(job_definition.workspace)

for filename, privacy_level in outputs.items():
if privacy_level == "moderately_sensitive":
ok, job_msg, file_msg = check_l4_file(
job_definition, filename, sizes[filename], workspace_dir
)
# local run currently does not have a level 4 directory, so exit early
if not medium_privacy_dir:
return excluded_job_msgs

# Copy out medium privacy files to L4
for filename in l4_files:
src = workspace_dir / filename
dst = medium_privacy_dir / filename
message_file = medium_privacy_dir / (filename + ".txt")

if not ok:
excluded_files[filename] = job_msg
if filename in excluded_file_msgs:
message_file.parent.mkdir(exist_ok=True, parents=True)
message_file.write_text(excluded_file_msgs[filename])
else:
volumes.copy_file(src, dst)
# if it previously had a message, delete it
delete_files_from_directory(medium_privacy_dir, [message_file])

# Update manifest with file metdata
manifest = read_manifest_file(medium_privacy_dir, job_definition.workspace)

for filename, level in outputs.items():
abspath = workspace_dir / filename
manifest["outputs"][filename] = get_output_metadata(
abspath,
level,
job_id=job_definition.id,
job_request=job_definition.job_request_id,
action=job_definition.action,
commit=job_definition.study.commit,
excluded=filename in excluded_file_msgs,
message=excluded_job_msgs.get(filename),
)

# local run currently does not have a level 4 directory
if medium_privacy_dir:
message_file = medium_privacy_dir / (filename + ".txt")
write_manifest_file(medium_privacy_dir, manifest)

if ok:
volumes.copy_file(
workspace_dir / filename, medium_privacy_dir / filename
)
# if it previously had a too big notice, delete it
delete_files_from_directory(medium_privacy_dir, [message_file])
else:
message_file.parent.mkdir(exist_ok=True, parents=True)
message_file.write_text(file_msg)

# this can be removed once osrelease is dead
write_manifest_file(
medium_privacy_dir,
{
# this currently needs to exist, but is not used
"repo": None,
"workspace": job_definition.workspace,
},
)
return excluded_job_msgs

return excluded_files

def get_output_metadata(
abspath, level, job_id, job_request, action, commit, excluded, message=None
):
stat = abspath.stat()
with abspath.open("rb") as fp:
content_hash = file_digest(fp, "sha256").hexdigest()

return {
"level": level,
"job_id": job_id,
"job_request": job_request,
"action": action,
"commit": commit,
"size": stat.st_size,
"timestamp": stat.st_mtime,
"content_hash": content_hash,
"excluded": excluded,
"message": message,
}


MAX_SIZE_MSG = """
Expand Down Expand Up @@ -804,6 +844,21 @@ def redact_environment_variables(container_metadata):
container_metadata["Config"]["Env"] = redacted_vars


def read_manifest_file(workspace_dir, workspace):
manifest_file = workspace_dir / METADATA_DIR / MANIFEST_FILE

if manifest_file.exists():
manifest = json.loads(manifest_file.read_text())
manifest.setdefault("outputs", {})
return manifest

return {
"workspace": workspace,
"repo": None, # old key, no longer needed
"outputs": {},
}


def write_manifest_file(workspace_dir, manifest):
manifest_file = workspace_dir / METADATA_DIR / MANIFEST_FILE
manifest_file.parent.mkdir(exist_ok=True, parents=True)
Expand Down
48 changes: 48 additions & 0 deletions jobrunner/lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import warnings
from contextlib import contextmanager
from datetime import datetime
from hashlib import new


@contextmanager
Expand All @@ -23,6 +24,53 @@ def atomic_writer(dest):
tmp.replace(dest)


# port of python 3.11's file_digest
def file_digest(fileobj, digest, /, *, _bufsize=2**18):
"""Hash the contents of a file-like object. Returns a digest object.
*fileobj* must be a file-like object opened for reading in binary mode.
It accepts file objects from open(), io.BytesIO(), and SocketIO objects.
The function may bypass Python's I/O and use the file descriptor *fileno*
directly.
*digest* must either be a hash algorithm name as a *str*, a hash
constructor, or a callable that returns a hash object.
"""
# On Linux we could use AF_ALG sockets and sendfile() to archive zero-copy
# hashing with hardware acceleration.
if isinstance(digest, str):
digestobj = new(digest)
else:
digestobj = digest()

if hasattr(fileobj, "getbuffer"):
# io.BytesIO object, use zero-copy buffer
digestobj.update(fileobj.getbuffer())
return digestobj

# Only binary files implement readinto().
if not (
hasattr(fileobj, "readinto")
and hasattr(fileobj, "readable")
and fileobj.readable()
):
raise ValueError(
f"'{fileobj!r}' is not a file-like object in binary reading mode."
)

# binary file, socket.SocketIO object
# Note: socket I/O uses different syscalls than file I/O.
buf = bytearray(_bufsize) # Reusable buffer to reduce allocations.
view = memoryview(buf)
while True:
size = fileobj.readinto(buf)
if size == 0:
break # EOF
digestobj.update(view[:size])

return digestobj


def datestr_to_ns_timestamp(datestr):
"""Parses a datestring with nanoseconds in it into integer ns timestamp.
Expand Down
41 changes: 41 additions & 0 deletions tests/test_local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,22 @@ def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_a
assert log_dir.exists()
assert log_file.exists()

level4_dir = local.get_medium_privacy_workspace(job_definition.workspace)
manifest = local.read_manifest_file(level4_dir, job_definition)

metadata = manifest["outputs"]["output/summary.csv"]
assert metadata["level"] == "moderately_sensitive"
assert metadata["job_id"] == job_definition.id
assert metadata["job_request"] == job_definition.job_request_id
assert metadata["action"] == job_definition.action
assert metadata["commit"] == job_definition.study.commit
assert metadata["excluded"] is False
assert metadata["size"] == 0
assert (
metadata["content_hash"]
== "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
)


@pytest.mark.needs_docker
def test_finalize_failed(docker_cleanup, job_definition, tmp_work_dir, volume_api):
Expand Down Expand Up @@ -550,12 +566,21 @@ def test_finalize_large_level4_outputs(

if not local_run:
level4_dir = local.get_medium_privacy_workspace(job_definition.workspace)

message_file = level4_dir / "output/output.txt.txt"
txt = message_file.read_text()
assert "output/output.txt" in txt
assert "1.0Mb" in txt
assert "0.5Mb" in txt

manifest = local.read_manifest_file(level4_dir, job_definition)

assert manifest["outputs"]["output/output.txt"]["excluded"]
assert (
manifest["outputs"]["output/output.txt"]["message"]
== "File size of 1.0Mb is larger that limit of 0.5Mb."
)


@pytest.mark.needs_docker
def test_finalize_invalid_file_type(docker_cleanup, job_definition, tmp_work_dir):
Expand Down Expand Up @@ -594,6 +619,14 @@ def test_finalize_invalid_file_type(docker_cleanup, job_definition, tmp_work_dir
assert "Invalid moderately_sensitive outputs:" in log
assert "output/output.rds - File type of .rds is not valid level 4 file" in log

manifest = local.read_manifest_file(level4_dir, job_definition)

assert manifest["outputs"]["output/output.rds"]["excluded"]
assert (
manifest["outputs"]["output/output.rds"]["message"]
== "File type of .rds is not valid level 4 file"
)


@pytest.mark.needs_docker
def test_finalize_patient_id_header(
Expand Down Expand Up @@ -641,6 +674,14 @@ def test_finalize_patient_id_header(
assert "output/output.csv" in txt
assert "patient_id" in txt

manifest = local.read_manifest_file(level4_dir, job_definition)

assert manifest["outputs"]["output/output.csv"]["excluded"]
assert (
manifest["outputs"]["output/output.csv"]["message"]
== "File has patient_id column"
)


@pytest.mark.needs_docker
def test_finalize_large_level4_outputs_cleanup(
Expand Down

0 comments on commit c29ed94

Please sign in to comment.