diff --git a/jobrunner/executors/local.py b/jobrunner/executors/local.py index d79cecc5..9a70eea3 100644 --- a/jobrunner/executors/local.py +++ b/jobrunner/executors/local.py @@ -22,7 +22,7 @@ JobStatus, Privacy, ) -from jobrunner.lib import datestr_to_ns_timestamp, docker +from jobrunner.lib import datestr_to_ns_timestamp, docker, file_digest from jobrunner.lib.git import checkout_commit from jobrunner.lib.path_utils import list_dir_with_ignore_patterns from jobrunner.lib.string_utils import tabulate @@ -493,53 +493,93 @@ def persist_outputs(job_definition, outputs, job_metadata): # Extract outputs to workspace workspace_dir = get_high_privacy_workspace(job_definition.workspace) - excluded_files = {} + excluded_job_msgs = {} + excluded_file_msgs = {} sizes = {} - for filename in outputs.keys(): + # copy all files into workspace long term storage + for filename, level in outputs.items(): log.info(f"Extracting output file: {filename}") - size = volumes.get_volume_api(job_definition).copy_from_volume( - job_definition, filename, workspace_dir / filename + dst = workspace_dir / filename + sizes[filename] = volumes.get_volume_api(job_definition).copy_from_volume( + job_definition, filename, dst ) - sizes[filename] = size - # Copy out medium privacy files + l4_files = [ + filename + for filename, level in outputs.items() + if level == "moderately_sensitive" + ] + + # check any L4 files are vaild + for filename in l4_files: + ok, job_msg, file_msg = check_l4_file( + job_definition, filename, sizes[filename], workspace_dir + ) + if not ok: + excluded_job_msgs[filename] = job_msg + excluded_file_msgs[filename] = file_msg + medium_privacy_dir = get_medium_privacy_workspace(job_definition.workspace) - for filename, privacy_level in outputs.items(): - if privacy_level == "moderately_sensitive": - ok, job_msg, file_msg = check_l4_file( - job_definition, filename, sizes[filename], workspace_dir - ) + # local run currently does not have a level 4 directory, so exit early + if not medium_privacy_dir: + return excluded_job_msgs + + # Copy out medium privacy files to L4 + for filename in l4_files: + src = workspace_dir / filename + dst = medium_privacy_dir / filename + message_file = medium_privacy_dir / (filename + ".txt") - if not ok: - excluded_files[filename] = job_msg + if filename in excluded_file_msgs: + message_file.parent.mkdir(exist_ok=True, parents=True) + message_file.write_text(excluded_file_msgs[filename]) + else: + volumes.copy_file(src, dst) + # if it previously had a message, delete it + delete_files_from_directory(medium_privacy_dir, [message_file]) + + # Update manifest with file metdata + manifest = read_manifest_file(medium_privacy_dir, job_definition.workspace) + + for filename, level in outputs.items(): + abspath = workspace_dir / filename + manifest["outputs"][filename] = get_output_metadata( + abspath, + level, + job_id=job_definition.id, + job_request=job_definition.job_request_id, + action=job_definition.action, + commit=job_definition.study.commit, + excluded=filename in excluded_file_msgs, + message=excluded_job_msgs.get(filename), + ) - # local run currently does not have a level 4 directory - if medium_privacy_dir: - message_file = medium_privacy_dir / (filename + ".txt") + write_manifest_file(medium_privacy_dir, manifest) - if ok: - volumes.copy_file( - workspace_dir / filename, medium_privacy_dir / filename - ) - # if it previously had a too big notice, delete it - delete_files_from_directory(medium_privacy_dir, [message_file]) - else: - message_file.parent.mkdir(exist_ok=True, parents=True) - message_file.write_text(file_msg) - - # this can be removed once osrelease is dead - write_manifest_file( - medium_privacy_dir, - { - # this currently needs to exist, but is not used - "repo": None, - "workspace": job_definition.workspace, - }, - ) + return excluded_job_msgs - return excluded_files + +def get_output_metadata( + abspath, level, job_id, job_request, action, commit, excluded, message=None +): + stat = abspath.stat() + with abspath.open("rb") as fp: + content_hash = file_digest(fp, "sha256").hexdigest() + + return { + "level": level, + "job_id": job_id, + "job_request": job_request, + "action": action, + "commit": commit, + "size": stat.st_size, + "timestamp": stat.st_mtime, + "content_hash": content_hash, + "excluded": excluded, + "message": message, + } MAX_SIZE_MSG = """ @@ -804,6 +844,21 @@ def redact_environment_variables(container_metadata): container_metadata["Config"]["Env"] = redacted_vars +def read_manifest_file(workspace_dir, workspace): + manifest_file = workspace_dir / METADATA_DIR / MANIFEST_FILE + + if manifest_file.exists(): + manifest = json.loads(manifest_file.read_text()) + manifest.setdefault("outputs", {}) + return manifest + + return { + "workspace": workspace, + "repo": None, # old key, no longer needed + "outputs": {}, + } + + def write_manifest_file(workspace_dir, manifest): manifest_file = workspace_dir / METADATA_DIR / MANIFEST_FILE manifest_file.parent.mkdir(exist_ok=True, parents=True) diff --git a/jobrunner/lib/__init__.py b/jobrunner/lib/__init__.py index a9fa3753..274e780f 100644 --- a/jobrunner/lib/__init__.py +++ b/jobrunner/lib/__init__.py @@ -3,6 +3,7 @@ import warnings from contextlib import contextmanager from datetime import datetime +from hashlib import new @contextmanager @@ -23,6 +24,53 @@ def atomic_writer(dest): tmp.replace(dest) +# port of python 3.11's file_digest +def file_digest(fileobj, digest, /, *, _bufsize=2**18): + """Hash the contents of a file-like object. Returns a digest object. + + *fileobj* must be a file-like object opened for reading in binary mode. + It accepts file objects from open(), io.BytesIO(), and SocketIO objects. + The function may bypass Python's I/O and use the file descriptor *fileno* + directly. + + *digest* must either be a hash algorithm name as a *str*, a hash + constructor, or a callable that returns a hash object. + """ + # On Linux we could use AF_ALG sockets and sendfile() to archive zero-copy + # hashing with hardware acceleration. + if isinstance(digest, str): + digestobj = new(digest) + else: + digestobj = digest() + + if hasattr(fileobj, "getbuffer"): + # io.BytesIO object, use zero-copy buffer + digestobj.update(fileobj.getbuffer()) + return digestobj + + # Only binary files implement readinto(). + if not ( + hasattr(fileobj, "readinto") + and hasattr(fileobj, "readable") + and fileobj.readable() + ): + raise ValueError( + f"'{fileobj!r}' is not a file-like object in binary reading mode." + ) + + # binary file, socket.SocketIO object + # Note: socket I/O uses different syscalls than file I/O. + buf = bytearray(_bufsize) # Reusable buffer to reduce allocations. + view = memoryview(buf) + while True: + size = fileobj.readinto(buf) + if size == 0: + break # EOF + digestobj.update(view[:size]) + + return digestobj + + def datestr_to_ns_timestamp(datestr): """Parses a datestring with nanoseconds in it into integer ns timestamp. diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 5f7e5922..30da7052 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -368,6 +368,22 @@ def test_finalize_success(docker_cleanup, job_definition, tmp_work_dir, volume_a assert log_dir.exists() assert log_file.exists() + level4_dir = local.get_medium_privacy_workspace(job_definition.workspace) + manifest = local.read_manifest_file(level4_dir, job_definition) + + metadata = manifest["outputs"]["output/summary.csv"] + assert metadata["level"] == "moderately_sensitive" + assert metadata["job_id"] == job_definition.id + assert metadata["job_request"] == job_definition.job_request_id + assert metadata["action"] == job_definition.action + assert metadata["commit"] == job_definition.study.commit + assert metadata["excluded"] is False + assert metadata["size"] == 0 + assert ( + metadata["content_hash"] + == "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ) + @pytest.mark.needs_docker def test_finalize_failed(docker_cleanup, job_definition, tmp_work_dir, volume_api): @@ -550,12 +566,21 @@ def test_finalize_large_level4_outputs( if not local_run: level4_dir = local.get_medium_privacy_workspace(job_definition.workspace) + message_file = level4_dir / "output/output.txt.txt" txt = message_file.read_text() assert "output/output.txt" in txt assert "1.0Mb" in txt assert "0.5Mb" in txt + manifest = local.read_manifest_file(level4_dir, job_definition) + + assert manifest["outputs"]["output/output.txt"]["excluded"] + assert ( + manifest["outputs"]["output/output.txt"]["message"] + == "File size of 1.0Mb is larger that limit of 0.5Mb." + ) + @pytest.mark.needs_docker def test_finalize_invalid_file_type(docker_cleanup, job_definition, tmp_work_dir): @@ -594,6 +619,14 @@ def test_finalize_invalid_file_type(docker_cleanup, job_definition, tmp_work_dir assert "Invalid moderately_sensitive outputs:" in log assert "output/output.rds - File type of .rds is not valid level 4 file" in log + manifest = local.read_manifest_file(level4_dir, job_definition) + + assert manifest["outputs"]["output/output.rds"]["excluded"] + assert ( + manifest["outputs"]["output/output.rds"]["message"] + == "File type of .rds is not valid level 4 file" + ) + @pytest.mark.needs_docker def test_finalize_patient_id_header( @@ -641,6 +674,14 @@ def test_finalize_patient_id_header( assert "output/output.csv" in txt assert "patient_id" in txt + manifest = local.read_manifest_file(level4_dir, job_definition) + + assert manifest["outputs"]["output/output.csv"]["excluded"] + assert ( + manifest["outputs"]["output/output.csv"]["message"] + == "File has patient_id column" + ) + @pytest.mark.needs_docker def test_finalize_large_level4_outputs_cleanup(