From c29ed94c154bb0d10a26e7befb4f75acd30a2093 Mon Sep 17 00:00:00 2001 From: bloodearnest Date: Tue, 23 Apr 2024 17:38:15 +0100 Subject: [PATCH] Write output metadata to manifest.json The main motivation for this was allowing airlock to access commit data for level 4. However, it was simple to extend this idea to include a) all outputs and b) much more metadata, like job, action, size, timestamp, content hash, and whether a file has been excluded from level 4 and why. This data is useful for airlock, but also potentially for other systems and general reporting. --- jobrunner/executors/local.py | 129 +++++++++++++++++++++++++---------- jobrunner/lib/__init__.py | 48 +++++++++++++ tests/test_local_executor.py | 41 +++++++++++ 3 files changed, 181 insertions(+), 37 deletions(-) diff --git a/jobrunner/executors/local.py b/jobrunner/executors/local.py index d79cecc5..9a70eea3 100644 --- a/jobrunner/executors/local.py +++ b/jobrunner/executors/local.py @@ -22,7 +22,7 @@ JobStatus, Privacy, ) -from jobrunner.lib import datestr_to_ns_timestamp, docker +from jobrunner.lib import datestr_to_ns_timestamp, docker, file_digest from jobrunner.lib.git import checkout_commit from jobrunner.lib.path_utils import list_dir_with_ignore_patterns from jobrunner.lib.string_utils import tabulate @@ -493,53 +493,93 @@ def persist_outputs(job_definition, outputs, job_metadata): # Extract outputs to workspace workspace_dir = get_high_privacy_workspace(job_definition.workspace) - excluded_files = {} + excluded_job_msgs = {} + excluded_file_msgs = {} sizes = {} - for filename in outputs.keys(): + # copy all files into workspace long term storage + for filename, level in outputs.items(): log.info(f"Extracting output file: {filename}") - size = volumes.get_volume_api(job_definition).copy_from_volume( - job_definition, filename, workspace_dir / filename + dst = workspace_dir / filename + sizes[filename] = volumes.get_volume_api(job_definition).copy_from_volume( + job_definition, filename, dst ) - sizes[filename] = size - # Copy out medium privacy files + l4_files = [ + filename + for filename, level in outputs.items() + if level == "moderately_sensitive" + ] + + # check any L4 files are vaild + for filename in l4_files: + ok, job_msg, file_msg = check_l4_file( + job_definition, filename, sizes[filename], workspace_dir + ) + if not ok: + excluded_job_msgs[filename] = job_msg + excluded_file_msgs[filename] = file_msg + medium_privacy_dir = get_medium_privacy_workspace(job_definition.workspace) - for filename, privacy_level in outputs.items(): - if privacy_level == "moderately_sensitive": - ok, job_msg, file_msg = check_l4_file( - job_definition, filename, sizes[filename], workspace_dir - ) + # local run currently does not have a level 4 directory, so exit early + if not medium_privacy_dir: + return excluded_job_msgs + + # Copy out medium privacy files to L4 + for filename in l4_files: + src = workspace_dir / filename + dst = medium_privacy_dir / filename + message_file = medium_privacy_dir / (filename + ".txt") - if not ok: - excluded_files[filename] = job_msg + if filename in excluded_file_msgs: + message_file.parent.mkdir(exist_ok=True, parents=True) + message_file.write_text(excluded_file_msgs[filename]) + else: + volumes.copy_file(src, dst) + # if it previously had a message, delete it + delete_files_from_directory(medium_privacy_dir, [message_file]) + + # Update manifest with file metdata + manifest = read_manifest_file(medium_privacy_dir, job_definition.workspace) + + for filename, level in outputs.items(): + abspath = workspace_dir / filename + manifest["outputs"][filename] = get_output_metadata( + abspath, + level, + job_id=job_definition.id, + job_request=job_definition.job_request_id, + action=job_definition.action, + commit=job_definition.study.commit, + excluded=filename in excluded_file_msgs, + message=excluded_job_msgs.get(filename), + ) - # local run currently does not have a level 4 directory - if medium_privacy_dir: - message_file = medium_privacy_dir / (filename + ".txt") + write_manifest_file(medium_privacy_dir, manifest) - if ok: - volumes.copy_file( - workspace_dir / filename, medium_privacy_dir / filename - ) - # if it previously had a too big notice, delete it - delete_files_from_directory(medium_privacy_dir, [message_file]) - else: - message_file.parent.mkdir(exist_ok=True, parents=True) - message_file.write_text(file_msg) - - # this can be removed once osrelease is dead - write_manifest_file( - medium_privacy_dir, - { - # this currently needs to exist, but is not used - "repo": None, - "workspace": job_definition.workspace, - }, - ) + return excluded_job_msgs - return excluded_files + +def get_output_metadata( + abspath, level, job_id, job_request, action, commit, excluded, message=None +): + stat = abspath.stat() + with abspath.open("rb") as fp: + content_hash = file_digest(fp, "sha256").hexdigest() + + return { + "level": level, + "job_id": job_id, + "job_request": job_request, + "action": action, + "commit": commit, + "size": stat.st_size, + "timestamp": stat.st_mtime, + "content_hash": content_hash, + "excluded": excluded, + "message": message, + } MAX_SIZE_MSG = """ @@ -804,6 +844,21 @@ def redact_environment_variables(container_metadata): container_metadata["Config"]["Env"] = redacted_vars +def read_manifest_file(workspace_dir, workspace): + manifest_file = workspace_dir / METADATA_DIR / MANIFEST_FILE + + if manifest_file.exists(): + manifest = json.loads(manifest_file.read_text()) + manifest.setdefault("outputs", {}) + return manifest + + return { + "workspace": workspace, + "repo": None, # old key, no longer needed + "outputs": {}, + } + + def write_manifest_file(workspace_dir, manifest): manifest_file = workspace_dir / METADATA_DIR / MANIFEST_FILE manifest_file.parent.mkdir(exist_ok=True, parents=True) diff --git a/jobrunner/lib/__init__.py b/jobrunner/lib/__init__.py index a9fa3753..274e780f 100644 --- a/jobrunner/lib/__init__.py +++ b/jobrunner/lib/__init__.py @@ -3,6 +3,7 @@ import warnings from contextlib import contextmanager from datetime import datetime +from hashlib import new @contextmanager @@ -23,6 +24,53 @@ def atomic_writer(dest): tmp.replace(dest) +# port of python 3.11's file_digest +def file_digest(fileobj, digest, /, *, _bufsize=2**18): + """Hash the contents of a file-like object. Returns a digest object. + + *fileobj* must be a file-like object opened for reading in binary mode. + It accepts file objects from open(), io.BytesIO(), and SocketIO objects. + The function may bypass Python's I/O and use the file descriptor *fileno* + directly. + + *digest* must either be a hash algorithm name as a *str*, a hash + constructor, or a callable that returns a hash object. + """ + # On Linux we could use AF_ALG sockets and sendfile() to archive zero-copy + # hashing with hardware acceleration. + if isinstance(digest, str): + digestobj = new(digest) + else: + digestobj = digest() + + if hasattr(fileobj, "getbuffer"): + # io.BytesIO object, use zero-copy buffer + digestobj.update(fileobj.getbuffer()) + return digestobj + + # Only binary files implement readinto(). + if not ( + hasattr(fileobj, "readinto") + and hasattr(fileobj, "readable") + and fileobj.readable() + ): + raise ValueError( + f"'{fileobj!r}' is not a file-like object in binary reading mode." + ) + + # binary file, socket.SocketIO object + # Note: socket I/O uses different syscalls than file I/O. + buf = bytearray(_bufsize) # Reusable buffer to reduce allocations. + view = memoryview(buf) + while True: + size = fileobj.readinto(buf) + if size == 0: + break # EOF + digestobj.update(view[:size]) + + return digestobj + + def datestr_to_ns_timestamp(datestr): """Parses a datestring with nanoseconds in it into integer ns timestamp. diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 5f7e5922..30da7052 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -368,6 +368,22 @@ def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_a assert log_dir.exists() assert log_file.exists() + level4_dir = local.get_medium_privacy_workspace(job_definition.workspace) + manifest = local.read_manifest_file(level4_dir, job_definition) + + metadata = manifest["outputs"]["output/summary.csv"] + assert metadata["level"] == "moderately_sensitive" + assert metadata["job_id"] == job_definition.id + assert metadata["job_request"] == job_definition.job_request_id + assert metadata["action"] == job_definition.action + assert metadata["commit"] == job_definition.study.commit + assert metadata["excluded"] is False + assert metadata["size"] == 0 + assert ( + metadata["content_hash"] + == "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ) + @pytest.mark.needs_docker def test_finalize_failed(docker_cleanup, job_definition, tmp_work_dir, volume_api): @@ -550,12 +566,21 @@ def test_finalize_large_level4_outputs( if not local_run: level4_dir = local.get_medium_privacy_workspace(job_definition.workspace) + message_file = level4_dir / "output/output.txt.txt" txt = message_file.read_text() assert "output/output.txt" in txt assert "1.0Mb" in txt assert "0.5Mb" in txt + manifest = local.read_manifest_file(level4_dir, job_definition) + + assert manifest["outputs"]["output/output.txt"]["excluded"] + assert ( + manifest["outputs"]["output/output.txt"]["message"] + == "File size of 1.0Mb is larger that limit of 0.5Mb." + ) + @pytest.mark.needs_docker def test_finalize_invalid_file_type(docker_cleanup, job_definition, tmp_work_dir): @@ -594,6 +619,14 @@ def test_finalize_invalid_file_type(docker_cleanup, job_definition, tmp_work_dir assert "Invalid moderately_sensitive outputs:" in log assert "output/output.rds - File type of .rds is not valid level 4 file" in log + manifest = local.read_manifest_file(level4_dir, job_definition) + + assert manifest["outputs"]["output/output.rds"]["excluded"] + assert ( + manifest["outputs"]["output/output.rds"]["message"] + == "File type of .rds is not valid level 4 file" + ) + @pytest.mark.needs_docker def test_finalize_patient_id_header( @@ -641,6 +674,14 @@ def test_finalize_patient_id_header( assert "output/output.csv" in txt assert "patient_id" in txt + manifest = local.read_manifest_file(level4_dir, job_definition) + + assert manifest["outputs"]["output/output.csv"]["excluded"] + assert ( + manifest["outputs"]["output/output.csv"]["message"] + == "File has patient_id column" + ) + @pytest.mark.needs_docker def test_finalize_large_level4_outputs_cleanup(