diff --git a/README.md b/README.md index 25933c463..663a4846b 100644 --- a/README.md +++ b/README.md @@ -248,3 +248,42 @@ PIXL data extracts include the below assumptions - (MRN, Accession number) is unique identifier for a report/DICOM study pair - Patients have a single _relevant_ MRN + + + +## File journey overview +Files that are present at each step of the pipeline. + +### Resources in source repo (for test only) +For building test versions of the OMOP ES extract dir. +``` +test/resources/omop/batch_1/... +....................batch_2/... +....................batch_3/public /*.parquet +............................private/*.parquet +............................extract_summary.json +``` + +### OMOP ES extract dir (input to PIXL) +As passed to CLI. +Square brackets (eg. `[batch_1]`) denote optional intermediate directory. +``` +EXTRACT_DIR/[batch_1]/... +.........../[batch_2]/public /*.parquet +......................private/*.parquet +......................extract_summary.json +``` + +### PIXL Export dir (PIXL intermediate) +``` +EXPORT_ROOT/PROJECT_SLUG/all_extracts/EXTRACT_DATETIME/radiology/radiology.parquet +....................................................../omop/public/[batch_1]/*.parquet +...................................................................[batch_2]/*.parquet +``` + +### FTP server +``` +FTPROOT/PROJECT_SLUG/EXTRACT_DATETIME/parquet/radiology/radiology.parquet +..............................................omop/public/[batch_1]/*.parquet +..........................................................[batch_2]/*.parquet +``` diff --git a/cli/README.md b/cli/README.md index 3df5f0392..114717068 100644 --- a/cli/README.md +++ b/cli/README.md @@ -44,21 +44,42 @@ pixl --help Populate queue for Imaging and EHR extraction ```bash -pixl populate +pixl populate PARQUET_DIR ``` -where `parquet_dir` contains at least the following files: +where `PARQUET_DIR` has one of the two following setups, which the CLI will automatically detect: +* In single batch mode: ```sh -parquet_dir -├── extract_summary.json -├── private -│ ├── PERSON_LINKS.parquet -│ └── PROCEDURE_OCCURRENCE_LINKS.parquet -└── public - └── PROCEDURE_OCCURRENCE.parquet +└── PARQUET_DIR + ├── extract_summary.json + ├── private + │ ├── PERSON_LINKS.parquet + │ └── PROCEDURE_OCCURRENCE_LINKS.parquet + └── public + └── PROCEDURE_OCCURRENCE.parquet ``` +* In multi-batch configuration: +```sh +└── PARQUET_DIR + ├── batch_1 + │ ├── extract_summary.json + │ ├── private + │ │ ├── PERSON_LINKS.parquet + │ │ └── PROCEDURE_OCCURRENCE_LINKS.parquet + │ └── public + │ └── PROCEDURE_OCCURRENCE.parquet + └── batch_2 + ├── extract_summary.json + ├── private + │ ├── PERSON_LINKS.parquet + │ └── PROCEDURE_OCCURRENCE_LINKS.parquet + └── public + └── PROCEDURE_OCCURRENCE.parquet +``` + + Start the imaging extraction ```bash diff --git a/cli/src/pixl_cli/_io.py b/cli/src/pixl_cli/_io.py index b9a368d52..a6edbf8d5 100644 --- a/cli/src/pixl_cli/_io.py +++ b/cli/src/pixl_cli/_io.py @@ -47,20 +47,79 @@ def messages_from_state_file(filepath: Path) -> list[Message]: return [deserialise(line) for line in filepath.open().readlines() if string_is_non_empty(line)] -def config_from_log_file(parquet_path: Path) -> tuple[str, datetime]: - log_file = parquet_path / "extract_summary.json" +def determine_batch_structure(extract_path: Path) -> tuple[str, datetime, list[Path]]: + """ + Takes the top level extract directory and determines if it's a single or multi batch setup, + and returns the resultant metadata. + :param extract_path: the top level path that may contain the single batch or one or + more directories containing batches. + :returns: ( slugified project name of all the batches, + slugified timestamp of all the batches, + list of directories containing batches + - can be just the top level directory in the case of a single batch + ) + :raises RuntimeError: if config files are non-existent or contradictory + """ + log_filename = "extract_summary.json" + single_batch_logfile = extract_path / log_filename + if single_batch_logfile.exists(): + project_name, omop_es_timestamp = _config_from_log_file(single_batch_logfile) + return project_name, omop_es_timestamp, [extract_path] + + # should it really be 'extract_*'? + # The order doesn't matter functionally, but sort here so failures are more repeatable. + batch_dirs = sorted(extract_path.glob("batch_*")) + extract_ids = [_config_from_log_file(log_file / log_filename) for log_file in batch_dirs] + # There should be one or more log files, all with the same identifiers + if not extract_ids: + err = f"No batched or unbatched log files found in {extract_path}" + raise RuntimeError(err) + + distinct_extract_ids = set(extract_ids) + if len(distinct_extract_ids) != 1: + err = ( + f"Found {len(batch_dirs)} log files with different IDs: " + f"Batch dirs: {batch_dirs}, IDs: {extract_ids}" + ) + raise RuntimeError(err) + + logger.info(f"Batches found: ({len(batch_dirs)}) {batch_dirs}") + project_name, omop_es_timestamp = distinct_extract_ids.pop() + return project_name, omop_es_timestamp, batch_dirs + + +def _config_from_log_file(log_file: Path) -> tuple[str, datetime]: + """ + Load a config file from the given file. This method has no knowledge of + multi- vs single- batch so should not be called directly. + """ logs = json.load(log_file.open()) project_name = logs["settings"]["cdm_source_name"] omop_es_timestamp = datetime.fromisoformat(logs["datetime"]) return project_name, omop_es_timestamp -def copy_parquet_return_logfile_fields(parquet_path: Path) -> tuple[str, datetime]: - """Copy public parquet file to extracts directory, and return fields from logfile""" - project_name, omop_es_timestamp = config_from_log_file(parquet_path) +def copy_parquet_return_logfile_fields(extract_path: Path) -> tuple[str, datetime, list[Path]]: + """ + Copy public parquet file to extracts directory, and return fields from logfile + :param extract_path: top-level dir for the OMOP ES extract + (either a single or multi batch extract) + :returns: ( slugified project name of all the batches, + slugified timestamp of all the batches, + list of directories containing batches + - can be just the top level directory in the case of a single batch + ) + :raises RuntimeError: if no log file can be found, or there is more than one batch and + project names or timestamps don't match. + """ + project_name, omop_es_timestamp, batch_dirs = determine_batch_structure(extract_path) extract = ParquetExport(project_name, omop_es_timestamp, HOST_EXPORT_ROOT_DIR) - project_name_slug = extract.copy_to_exports(parquet_path) - return project_name_slug, omop_es_timestamp + for batch in batch_dirs: + batch_subdir = batch.relative_to(extract_path) + logger.info(f"Copying extract files from batch at {batch}...") + extract.copy_to_exports(batch, batch_subdir) + logger.info(f"Done copying extract files from batch at {batch}") + return extract.project_slug, omop_es_timestamp, batch_dirs def messages_from_parquet( diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index 0a232e839..19092a546 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -28,8 +28,8 @@ from pixl_cli._config import cli_config from pixl_cli._database import filter_exported_or_add_to_db from pixl_cli._io import ( - config_from_log_file, copy_parquet_return_logfile_fields, + determine_batch_structure, messages_from_parquet, messages_from_state_file, ) @@ -80,8 +80,10 @@ def populate(parquet_dir: Path, *, restart: bool, queues: str) -> None: └── extract_summary.json """ logger.info(f"Populating queue(s) {queues} from {parquet_dir}") - project_name, omop_es_datetime = copy_parquet_return_logfile_fields(parquet_dir) - messages = messages_from_parquet(parquet_dir, project_name, omop_es_datetime) + project_name, omop_es_datetime, batch_dirs = copy_parquet_return_logfile_fields(parquet_dir) + messages = [] + for batch in batch_dirs: + messages.extend(messages_from_parquet(batch, project_name, omop_es_datetime)) for queue in queues.split(","): state_filepath = state_filepath_for_queue(queue) @@ -113,13 +115,13 @@ def extract_radiology_reports(parquet_dir: Path) -> None: PARQUET_DIR: Directory containing the extract_summary.json log file defining which extract to export radiology reports for. """ - project_name, omop_es_datetime = config_from_log_file(parquet_dir) + project_name, omop_es_timestamp, _batch_dirs = determine_batch_structure(parquet_dir) # Call the EHR API api_config = api_config_for_queue("ehr") response = requests.post( url=f"{api_config.base_url}/export-patient-data", - json={"project_name": project_name, "extract_datetime": omop_es_datetime.isoformat()}, + json={"project_name": project_name, "extract_datetime": omop_es_timestamp.isoformat()}, timeout=10, ) diff --git a/cli/tests/conftest.py b/cli/tests/conftest.py index f8d0f69d8..9ff7f8815 100644 --- a/cli/tests/conftest.py +++ b/cli/tests/conftest.py @@ -15,12 +15,17 @@ from __future__ import annotations import pathlib +import shutil +from typing import TYPE_CHECKING import pytest from core.db.models import Base, Extract, Image from sqlalchemy import Engine, create_engine from sqlalchemy.orm import Session, sessionmaker +if TYPE_CHECKING: + from collections.abc import Callable + @pytest.fixture(autouse=True) def export_dir(tmp_path_factory: pytest.TempPathFactory) -> pathlib.Path: @@ -30,8 +35,37 @@ def export_dir(tmp_path_factory: pytest.TempPathFactory) -> pathlib.Path: @pytest.fixture() def resources() -> pathlib.Path: - """Test resources directory path.""" - return pathlib.Path(__file__).parent / "resources" + """Top-level test resources directory path.""" + return pathlib.Path(__file__).parents[2] / "test" / "resources" + + +@pytest.fixture() +def omop_es_batch_generator(resources, tmp_path_factory) -> Callable[..., pathlib.Path]: + """ + return a callable which returns, by default, a path to (a copy of) the + resources/omop/batch_1/ directory, as if it were a single batch. + You can also set up any subset of the resources/omop/batch_* directories to be present + in the returned directory. Useful for testing different setups without having a load of + copied files in the resources/omop directory. + """ + omop_batch_root = resources / "omop" + # keep separate from a test that might want to use tmp_path + tmp = tmp_path_factory.mktemp("copied_omop_es_input") + + def inner_gen(batches=None, *, single_batch: bool = True) -> pathlib.Path: + if batches is None: + batches = ["batch_1"] + if single_batch: + assert len(batches) == 1 + # the root tmp dir will already exist; we are effectively replacing it + shutil.copytree(omop_batch_root / batches[0], tmp, dirs_exist_ok=True) + else: + assert batches + for b in batches: + shutil.copytree(omop_batch_root / b, tmp / b) + return tmp + + return inner_gen @pytest.fixture(scope="module") diff --git a/cli/tests/resources/omop/private/PERSON_LINKS.parquet b/cli/tests/resources/omop/private/PERSON_LINKS.parquet deleted file mode 100644 index bef07a1a2..000000000 Binary files a/cli/tests/resources/omop/private/PERSON_LINKS.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/PROCEDURE_OCCURRENCE.parquet b/cli/tests/resources/omop/public/PROCEDURE_OCCURRENCE.parquet deleted file mode 100644 index 0b526f617..000000000 Binary files a/cli/tests/resources/omop/public/PROCEDURE_OCCURRENCE.parquet and /dev/null differ diff --git a/cli/tests/test_copy_omop.py b/cli/tests/test_copy_omop.py index ac3226913..a5bbf442c 100644 --- a/cli/tests/test_copy_omop.py +++ b/cli/tests/test_copy_omop.py @@ -15,24 +15,25 @@ from __future__ import annotations import datetime +from pathlib import Path import pytest from core.exports import ParquetExport -def test_new_project_copies(resources, export_dir): +def test_new_project_copies(omop_es_batch_generator, export_dir): """ Given a valid export directory and hasn't been exported before When copy to exports is run Then the public files should be copied and symlinked to the latest export directory """ # ARRANGE - input_dir = resources / "omop" + input_dir = omop_es_batch_generator() project_name = "Really great cool project" input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00") omop_files = ParquetExport(project_name, input_date, export_dir) # ACT - omop_files.copy_to_exports(input_dir) + omop_files.copy_to_exports(input_dir, Path()) # ASSERT output_base = omop_files.export_dir / "really-great-cool-project" @@ -51,7 +52,7 @@ def test_new_project_copies(resources, export_dir): assert expected_files == sorted([x.stem for x in symlinked_files]) -def test_second_export(resources, export_dir): +def test_second_export(omop_es_batch_generator, export_dir): """ Given one export already exists for the project When a second export with a different timestamp is run for the same project @@ -59,18 +60,18 @@ def test_second_export(resources, export_dir): and the symlinked dir should point to the most recently copied dir """ # ARRANGE - input_dir = resources / "omop" + input_dir = omop_es_batch_generator() project_name = "Really great cool project" first_export_datetime = datetime.datetime.fromisoformat("2020-06-10T18:00:00") omop_files = ParquetExport(project_name, first_export_datetime, export_dir) - omop_files.copy_to_exports(input_dir) + omop_files.copy_to_exports(input_dir, Path()) second_export_datetime = datetime.datetime.fromisoformat("2020-07-10T18:00:00") omop_files = ParquetExport(project_name, second_export_datetime, export_dir) # ACT - omop_files.copy_to_exports(input_dir) + omop_files.copy_to_exports(input_dir, Path()) # ASSERT output_base = omop_files.export_dir / "really-great-cool-project" @@ -96,4 +97,4 @@ def test_project_with_no_public(resources, export_dir): input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00") omop_files = ParquetExport(project_name, input_date, export_dir) with pytest.raises(FileNotFoundError): - omop_files.copy_to_exports(input_dir) + omop_files.copy_to_exports(input_dir, Path()) diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index 432673995..cf11ed20d 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -15,30 +15,50 @@ from __future__ import annotations import datetime -from typing import TYPE_CHECKING +import os +import shutil +import pytest from core.patient_queue.message import Message -from pixl_cli._io import copy_parquet_return_logfile_fields, messages_from_parquet +from pixl_cli._io import ( + copy_parquet_return_logfile_fields, + determine_batch_structure, + messages_from_parquet, +) +from pyarrow import ArrowException -if TYPE_CHECKING: - from pathlib import Path - -def test_messages_from_parquet(resources: Path) -> None: +@pytest.mark.parametrize( + ("batches", "single_batch", "expected_message_indexes"), + [ + (["batch_1", "batch_2"], False, [0, 1]), + (["batch_1"], True, [0]), + (["batch_1"], False, [0]), + (["batch_2"], True, [1]), + (["batch_2"], False, [1]), + ], +) +def test_messages_from_parquet( + omop_es_batch_generator, batches, single_batch, expected_message_indexes +) -> None: """ Given a valid OMOP ES extract with 4 procedures, two of which are x-rays. When the messages are generated from the directory and the output of logfile parsing Then two messages should be generated """ # Arrange - omop_parquet_dir = resources / "omop" - project_name, omop_es_datetime = copy_parquet_return_logfile_fields(omop_parquet_dir) + omop_parquet_dir = omop_es_batch_generator(batches, single_batch=single_batch) + project_name, omop_es_datetime, batch_dirs = copy_parquet_return_logfile_fields( + omop_parquet_dir + ) # Act - messages = messages_from_parquet(omop_parquet_dir, project_name, omop_es_datetime) + messages = [] + for batch in batch_dirs: + messages.extend(messages_from_parquet(batch, project_name, omop_es_datetime)) # Assert assert all(isinstance(msg, Message) for msg in messages) - expected_messages = [ + all_expected_messages = [ Message( mrn="987654321", accession_number="AA12345601", @@ -56,5 +76,65 @@ def test_messages_from_parquet(resources: Path) -> None: omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), ), ] - + expected_messages = [all_expected_messages[i] for i in expected_message_indexes] assert messages == expected_messages + + +def test_conflicting_batches(omop_es_batch_generator) -> None: + """ + Batches 1 and 3 have different timestamps so should fail if they are given to us as part of the + same extract. + """ + omop_parquet_dir = omop_es_batch_generator(["batch_1", "batch_3"], single_batch=False) + with pytest.raises(RuntimeError, match=r"log files with different IDs.*batch_.*batch_"): + copy_parquet_return_logfile_fields(omop_parquet_dir) + + +def test_empty_batches(tmp_path) -> None: + """Empty dir, nothing found.""" + with pytest.raises(RuntimeError, match=r"No batched or unbatched log files found in"): + copy_parquet_return_logfile_fields(tmp_path) + + +def test_missing_public(omop_es_batch_generator) -> None: + """ + This error is hard to reach in real life because a missing public dir would trigger an error + from copy_parquet_return_logfile_fields first. + """ + omop_parquet_dir = omop_es_batch_generator(["batch_1"], single_batch=True) + # simulate broken input batch + shutil.rmtree(omop_parquet_dir / "public", ignore_errors=False) + + project_name, omop_es_datetime, batch_dirs = determine_batch_structure(omop_parquet_dir) + with pytest.raises(NotADirectoryError): + messages_from_parquet(omop_parquet_dir, project_name, omop_es_datetime) + + +@pytest.mark.parametrize( + "file_to_corrupt", + [ + # include all parquet files required to generate messages + "public/PROCEDURE_OCCURRENCE.parquet", + "private/PROCEDURE_OCCURRENCE_LINKS.parquet", + "private/PERSON_LINKS.parquet", + ], +) +def test_broken_parquet_dir(omop_es_batch_generator, file_to_corrupt) -> None: + """ + Check that if any of the parquet files we use to generate messages are incomplete, + nothing much happens. We aren't checking the validity of parquet files that we simply copy + to the extract dir. + We fail even if batch_1 is fine but batch_2 is faulty; this might be an argument + for continuing to call messages_from_parquet on *all* batches before sending any messages, + as long as that's not too slow or uses too much memory. + """ + omop_parquet_dir = omop_es_batch_generator(["batch_1", "batch_2"], single_batch=False) + # Assume most likely error is an incomplete copy. Because parquet has a magic footer, + # even one byte missing at the end should be enough for it to fail. + to_corrupt = omop_parquet_dir / "batch_2" / file_to_corrupt + new_size = to_corrupt.stat().st_size - 1 + os.truncate(to_corrupt, new_size) + project_name, omop_es_datetime, batch_dirs = determine_batch_structure(omop_parquet_dir) + with pytest.raises(ArrowException): # noqa: PT012 It may not fail on all, but must fail on one + for b in batch_dirs: + messages_from_parquet(b, project_name, omop_es_datetime) diff --git a/cli/tests/test_populate.py b/cli/tests/test_populate.py index 3ee6ec53a..888dfd4b0 100644 --- a/cli/tests/test_populate.py +++ b/cli/tests/test_populate.py @@ -28,10 +28,10 @@ def test_populate_queue_parquet( - monkeypatch, resources: Path, queue_name: str = "test_populate" + omop_es_batch_generator, monkeypatch, resources: Path, queue_name: str = "test_populate" ) -> None: """Checks that patient queue can be populated without error.""" - omop_parquet_dir = str(resources / "omop") + omop_parquet_dir = str(omop_es_batch_generator()) runner = CliRunner() # mock producer diff --git a/pixl_core/pyproject.toml b/pixl_core/pyproject.toml index c0ed4dc48..4a0326765 100644 --- a/pixl_core/pyproject.toml +++ b/pixl_core/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "sqlalchemy==2.0.24", "psycopg2-binary==2.9.9", "pandas==1.5.1", + "pyarrow==14.0.1", ] [project.optional-dependencies] diff --git a/pixl_core/src/core/exports.py b/pixl_core/src/core/exports.py index b9a013d2d..aa8ae75c4 100644 --- a/pixl_core/src/core/exports.py +++ b/pixl_core/src/core/exports.py @@ -58,22 +58,25 @@ def _get_slugs(project_name: str, extract_datetime: datetime.datetime) -> tuple[ extract_time_slug = slugify.slugify(extract_datetime.isoformat()) return project_slug, extract_time_slug - def copy_to_exports(self, input_omop_dir: pathlib.Path) -> str: + def copy_to_exports(self, input_omop_dir: pathlib.Path, rel_batch_name: pathlib.Path) -> str: """ Copy public omop directory as the latest extract for the project. Creates directories if they don't already exist. :param input_omop_dir: parent path for input omop data, with a "public" subdirectory + :param rel_batch_name: batch name which determines output subdir to copy to, or Path() if + this is a single batch extract :raises FileNotFoundError: if there is no public subdirectory in `omop_dir` :returns str: the project slug, so this can be registered for export to the DSH """ public_input = input_omop_dir / "public" # Make directory for exports if they don't exist - ParquetExport._mkdir(self.public_output) - logger.info("Copying public parquet files from %s to %s", public_input, self.public_output) + public_batch_output = self.public_output / rel_batch_name + ParquetExport._mkdir(public_batch_output) + logger.info("Copying public parquet files from %s to %s", public_input, public_batch_output) # Copy extract files, overwriting if it exists - shutil.copytree(public_input, self.public_output, dirs_exist_ok=True) + shutil.copytree(public_input, public_batch_output, dirs_exist_ok=True) # Symlink this extract to the latest directory self.latest_symlink.unlink(missing_ok=True) diff --git a/pixl_core/src/core/upload.py b/pixl_core/src/core/upload.py index fdcda10ac..3c60c9ad9 100644 --- a/pixl_core/src/core/upload.py +++ b/pixl_core/src/core/upload.py @@ -21,6 +21,7 @@ import ssl from datetime import datetime, timezone from ftplib import FTP_TLS +from pathlib import Path from typing import TYPE_CHECKING, Any, BinaryIO if TYPE_CHECKING: @@ -87,23 +88,34 @@ def upload_dicom_image(zip_content: BinaryIO, pseudo_anon_id: str) -> None: def upload_parquet_files(parquet_export: ParquetExport) -> None: """Upload parquet to FTPS under //parquet.""" - current_extract = parquet_export.public_output.parents[1] + source_root_dir = parquet_export.current_extract_base # Create the remote directory if it doesn't exist ftp = _connect_to_ftp() _create_and_set_as_cwd(ftp, parquet_export.project_slug) _create_and_set_as_cwd(ftp, parquet_export.extract_time_slug) _create_and_set_as_cwd(ftp, "parquet") - export_files = [x for x in current_extract.rglob("*.parquet") if x.is_file()] - if not export_files: - msg = f"No files found in {current_extract}" + # get the upload root directory before we do anything as we'll need + # to return to it (will it always be absolute?) + upload_root_dir = Path(ftp.pwd()) + if not upload_root_dir.is_absolute(): + logger.error("server remote path is not absolute, what are we going to do?") + + # absolute paths of the source + source_files = [x for x in source_root_dir.rglob("*.parquet") if x.is_file()] + if not source_files: + msg = f"No files found in {source_root_dir}" raise FileNotFoundError(msg) # throw exception if empty dir - for path in export_files: - with path.open("rb") as handle: - command = f"STOR {path.stem}.parquet" - logger.debug("Running %s", command) + for source_path in source_files: + _create_and_set_as_cwd(ftp, str(upload_root_dir)) + source_rel_path = source_path.relative_to(source_root_dir) + source_rel_dir = source_rel_path.parent + source_filename_only = source_rel_path.relative_to(source_rel_dir) + _create_and_set_as_cwd_multi_path(ftp, source_rel_dir) + with source_path.open("rb") as handle: + command = f"STOR {source_filename_only}" # Store the file using a binary handler ftp.storbinary(command, handle) @@ -132,6 +144,19 @@ def _connect_to_ftp() -> FTP_TLS: return ftp +def _create_and_set_as_cwd_multi_path(ftp: FTP_TLS, remote_multi_dir: Path) -> None: + """Create (and cwd into) a multi dir path, analogously to mkdir -p""" + if remote_multi_dir.is_absolute(): + # would require some special handling and we don't need it + err = "must be relative path" + raise ValueError(err) + logger.info("_create_and_set_as_cwd_multi_path %s", remote_multi_dir) + # path should be pretty normalised, so assume split is safe + sub_dirs = str(remote_multi_dir).split("/") + for sd in sub_dirs: + _create_and_set_as_cwd(ftp, sd) + + def _create_and_set_as_cwd(ftp: FTP_TLS, project_dir: str) -> None: try: ftp.cwd(project_dir) diff --git a/pixl_core/tests/conftest.py b/pixl_core/tests/conftest.py index 98cbde17b..dbf407762 100644 --- a/pixl_core/tests/conftest.py +++ b/pixl_core/tests/conftest.py @@ -16,9 +16,10 @@ import datetime import os import pathlib +import shutil import subprocess from pathlib import Path -from typing import BinaryIO +from typing import BinaryIO, Callable import pytest from core.db.models import Base, Extract, Image @@ -42,6 +43,41 @@ STUDY_DATE = datetime.date.fromisoformat("2023-01-01") +@pytest.fixture() +def resources() -> pathlib.Path: + """Top-level test resources directory path.""" + return pathlib.Path(__file__).parents[2] / "test" / "resources" + + +@pytest.fixture() +def omop_es_batch_generator(resources, tmp_path_factory) -> Callable[..., pathlib.Path]: + """ + return a callable which returns, by default, a path to (a copy of) the + resources/omop/batch_1/ directory, as if it were a single batch. + You can also set up any subset of the resources/omop/batch_* directories to be present + in the returned directory. Useful for testing different setups without having a load of + copied files in the resources/omop directory. + """ + omop_batch_root = resources / "omop" + # keep separate from a test that might want to use tmp_path + tmp = tmp_path_factory.mktemp("copied_omop_es_input") + + def inner_gen(batches=None, *, single_batch: bool = True) -> pathlib.Path: + if batches is None: + batches = ["batch_1"] + if single_batch: + assert len(batches) == 1 + # the root tmp dir will already exist; we are effectively replacing it + shutil.copytree(omop_batch_root / batches[0], tmp, dirs_exist_ok=True) + else: + assert batches + for b in batches: + shutil.copytree(omop_batch_root / b, tmp / b) + return tmp + + return inner_gen + + @pytest.fixture(scope="package") def run_containers() -> subprocess.CompletedProcess[bytes]: """Run docker containers for tests which require them.""" @@ -58,6 +94,7 @@ def run_containers() -> subprocess.CompletedProcess[bytes]: cwd=TEST_DIR, shell=True, # noqa: S602 timeout=60, + capture_output=True, ) subprocess.run( b"docker compose down --volumes", @@ -65,6 +102,7 @@ def run_containers() -> subprocess.CompletedProcess[bytes]: cwd=TEST_DIR, shell=True, # noqa: S602 timeout=60, + capture_output=True, ) diff --git a/pixl_core/tests/test_upload.py b/pixl_core/tests/test_upload.py index 5e05d73b7..2a510097a 100644 --- a/pixl_core/tests/test_upload.py +++ b/pixl_core/tests/test_upload.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. """Test functionality to upload files to an endpoint.""" - - +import filecmp import pathlib from datetime import datetime, timezone +import pandas as pd import pytest from core.db.models import Image from core.db.queries import get_project_slug_from_db, update_exported_at @@ -69,25 +69,40 @@ def test_update_exported_and_save(rows_in_session) -> None: @pytest.mark.usefixtures("run_containers") -def test_upload_parquet(parquet_export, mounted_data) -> None: - """Tests that parquet files are uploaded to the correct location""" - # ARRANGE - - parquet_export.copy_to_exports( - pathlib.Path(__file__).parents[2] / "test" / "resources" / "omop" - ) - with (parquet_export.public_output.parent / "radiology.parquet").open("w") as handle: - handle.writelines(["dummy data"]) +def test_upload_parquet(omop_es_batch_generator, parquet_export, mounted_data) -> None: + """ + GIVEN an OMOP-ES extract of two batches has been added to the export dir + WHEN parquet files are uploaded via FTPS + THEN the structure of the batch should be preserved on the FTP server + """ + # ARRANGE - a ready to upload directory containing copied OMOP extract and radiology reports + batch_names = ["batch_1", "batch_2"] + omop_input_path = omop_es_batch_generator(batch_names, single_batch=False) + for batch in batch_names: + parquet_export.copy_to_exports( + omop_input_path / batch, + pathlib.Path(batch), + ) + parquet_export.export_radiology(pd.DataFrame(list("dummy"), columns=["D"])) # ACT upload_parquet_files(parquet_export) + # ASSERT expected_public_parquet_dir = ( mounted_data / parquet_export.project_slug / parquet_export.extract_time_slug / "parquet" ) assert expected_public_parquet_dir.exists() - assert (expected_public_parquet_dir / "PROCEDURE_OCCURRENCE.parquet").exists() - assert (expected_public_parquet_dir / "radiology.parquet").exists() + # Print difference report to aid debugging (it doesn't actually assert anything) + dc = filecmp.dircmp(parquet_export.current_extract_base, expected_public_parquet_dir) + dc.report_full_closure() + assert ( + expected_public_parquet_dir / "omop" / "public" / "batch_1" / "PROCEDURE_OCCURRENCE.parquet" + ).exists() + assert ( + expected_public_parquet_dir / "omop" / "public" / "batch_2" / "PROCEDURE_OCCURRENCE.parquet" + ).exists() + assert (expected_public_parquet_dir / "radiology" / "radiology.parquet").exists() @pytest.mark.usefixtures("run_containers") diff --git a/test/README.md b/test/README.md index 450a06f2f..13a11abf1 100644 --- a/test/README.md +++ b/test/README.md @@ -35,7 +35,8 @@ A test `pixl_config.yml` file is provided to run the PIXL pipeline. ### Dummy services `./dummy-services` contains a Python script and Dockerfile to mock the CogStack service, used for -de-identification of the radiology reports in the EHR API. +de-identification of the radiology reports in the EHR API, and an FTP server for testing the +DSH upload. #### FTP server @@ -66,8 +67,14 @@ and are copied into `/etc/ssl/private` when building the Docker container. - `./resources/` provides 2 mock DICOM images used to populate the mock VNA and a JSON file of slice varying parameters from a 3D MRI sequence. -- `./resources/omop` contains mock public and private Parquet files used to populate the message - queues and extract the radiology reports +- `./resources/omop` contains several extract batches, each containing a config (log) file and + mock public and private Parquet files used to populate the message + queues and extract the radiology reports. + System and unit tests are able to set up some subset of these + batches depending on what they're testing. `batch_1` and `batch_2` together form the happy + path for the system test. `batch_3` has a mismatching timestamp so is supposed + to fail when used with 1 or 2. Any batch can also be used alone as a single-batch (old style) + extract. ### VNA config diff --git a/cli/tests/resources/omop/extract_summary.json b/test/resources/omop/batch_1/extract_summary.json similarity index 100% rename from cli/tests/resources/omop/extract_summary.json rename to test/resources/omop/batch_1/extract_summary.json diff --git a/test/resources/omop/batch_1/private/PERSON_LINKS.parquet b/test/resources/omop/batch_1/private/PERSON_LINKS.parquet new file mode 100644 index 000000000..4381f36a5 Binary files /dev/null and b/test/resources/omop/batch_1/private/PERSON_LINKS.parquet differ diff --git a/test/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet b/test/resources/omop/batch_1/private/PROCEDURE_OCCURRENCE_LINKS.parquet similarity index 69% rename from test/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet rename to test/resources/omop/batch_1/private/PROCEDURE_OCCURRENCE_LINKS.parquet index f4a1d6370..9dfa295a2 100644 Binary files a/test/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet and b/test/resources/omop/batch_1/private/PROCEDURE_OCCURRENCE_LINKS.parquet differ diff --git a/test/resources/omop/batch_1/public/PROCEDURE_OCCURRENCE.parquet b/test/resources/omop/batch_1/public/PROCEDURE_OCCURRENCE.parquet new file mode 100644 index 000000000..e01bdcd71 Binary files /dev/null and b/test/resources/omop/batch_1/public/PROCEDURE_OCCURRENCE.parquet differ diff --git a/test/resources/omop/extract_summary.json b/test/resources/omop/batch_2/extract_summary.json similarity index 100% rename from test/resources/omop/extract_summary.json rename to test/resources/omop/batch_2/extract_summary.json diff --git a/test/resources/omop/batch_2/private/PERSON_LINKS.parquet b/test/resources/omop/batch_2/private/PERSON_LINKS.parquet new file mode 100644 index 000000000..33492439b Binary files /dev/null and b/test/resources/omop/batch_2/private/PERSON_LINKS.parquet differ diff --git a/cli/tests/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet b/test/resources/omop/batch_2/private/PROCEDURE_OCCURRENCE_LINKS.parquet similarity index 68% rename from cli/tests/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet rename to test/resources/omop/batch_2/private/PROCEDURE_OCCURRENCE_LINKS.parquet index f4a1d6370..3ca256502 100644 Binary files a/cli/tests/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet and b/test/resources/omop/batch_2/private/PROCEDURE_OCCURRENCE_LINKS.parquet differ diff --git a/test/resources/omop/batch_2/public/PROCEDURE_OCCURRENCE.parquet b/test/resources/omop/batch_2/public/PROCEDURE_OCCURRENCE.parquet new file mode 100644 index 000000000..b66ca7f39 Binary files /dev/null and b/test/resources/omop/batch_2/public/PROCEDURE_OCCURRENCE.parquet differ diff --git a/test/resources/omop/batch_3/extract_summary.json b/test/resources/omop/batch_3/extract_summary.json new file mode 100644 index 000000000..996b7dff6 --- /dev/null +++ b/test/resources/omop/batch_3/extract_summary.json @@ -0,0 +1,94 @@ +{ + "gitsha":"56e0eba8d098523c99f3c899979096d2c5ed4c5f", + "filesummaries":[ +"CARE_SITE.parquet: 4084 bytes", +"CARE_SITE_BAD.parquet: 3305 bytes", +"CARE_SITE_LINKS.parquet: 1201 bytes", +"CDM_SOURCE.parquet: 5823 bytes", +"CDM_SOURCE_BAD.parquet: 7852 bytes", +"CONDITION_OCCURRENCE.parquet: 6770 bytes", +"CONDITION_OCCURRENCE_BAD.parquet: 5004 bytes", +"CONDITION_OCCURRENCE_LINKS.parquet: 612 bytes", +"DEVICE_EXPOSURE.parquet: 4524 bytes", +"DEVICE_EXPOSURE_BAD.parquet: 4524 bytes", +"DEVICE_EXPOSURE_LINKS.parquet: 682 bytes", +"DRUG_EXPOSURE.parquet: 5782 bytes", +"DRUG_EXPOSURE_BAD.parquet: 3907 bytes", +"DRUG_EXPOSURE_LINKS.parquet: 597 bytes", +"FACT_RELATIONSHIP.parquet: 2167 bytes", +"FACT_RELATIONSHIP_BAD.parquet: 1357 bytes", +"LOCATION.parquet: 1865 bytes", +"LOCATION_BAD.parquet: 1343 bytes", +"LOCATION_LINKS.parquet: 904 bytes", +"MEASUREMENT.parquet: 6742 bytes", +"MEASUREMENT_BAD.parquet: 3982 bytes", +"MEASUREMENT_LINKS.parquet: 2309 bytes", +"OBSERVATION.parquet: 5614 bytes", +"OBSERVATION_BAD.parquet: 3618 bytes", +"OBSERVATION_LINKS.parquet: 1263 bytes", +"OBSERVATION_PERIOD.parquet: 2183 bytes", +"OBSERVATION_PERIOD_BAD.parquet: 1488 bytes", +"OBSERVATION_PERIOD_LINKS.parquet: 606 bytes", +"PERSON.parquet: 5420 bytes", +"PERSON_BAD.parquet: 3614 bytes", +"PERSON_LINKS.parquet: 1953 bytes", +"PROCEDURE_OCCURRENCE.parquet: 5230 bytes", +"PROCEDURE_OCCURRENCE_BAD.parquet: 3665 bytes", +"PROCEDURE_OCCURRENCE_LINKS.parquet: 1311 bytes", +"SPECIMEN.parquet: 4873 bytes", +"SPECIMEN_BAD.parquet: 3326 bytes", +"SPECIMEN_LINKS.parquet: 928 bytes", +"VISIT_DETAIL.parquet: 3228 bytes", +"VISIT_DETAIL_BAD.parquet: 3228 bytes", +"VISIT_DETAIL_LINKS.parquet: 435 bytes", +"VISIT_OCCURRENCE.parquet: 5259 bytes", +"VISIT_OCCURRENCE_BAD.parquet: 3429 bytes", +"VISIT_OCCURRENCE_LINKS.parquet: 1349 bytes" + ], + "datetime":"2023-12-08 14:08:58", + "user":"John Watts", + "settings":{ + "site":"UCLH", + "cdm_source_name":"Test Extract - UCLH OMOP CDM", + "cdm_source_abbreviation":"Test UCLH OMOP", + "project_logic":"mock_project_settings/project_logic.R", + "min_date": 20100101, + "max_date": 20241231, + "enabled_sources":"epic", + "output_format":"parquet", + "OMOP_version": 60, + "cohort":{ + "file":"settings/mock_project_settings/mock_cohort.csv", + "exclude_NDOO": true, + "exclude_confidential": true, + "min_age_at_encounter_start": 16, + "max_age_at_encounter_start": 80 + }, + "keep_source_vals": false, + "person":{ + "include_nhs_number": false, + "include_mrn": false, + "keep_day_of_birth": false, + "keep_month_of_birth": true, + "include_gp_as_primary_care_site": false + }, + "observation_period_strategy":"visit_span", + "local_timezone":"Europe/London", + "output_timezone":"GMT", + "condition_occurrence":{ + "include_sexual_health": false, + "allow_icd_as_std": true + }, + "measurements":{ + "include_file":null, + "include_measurement_concept_ids":null, + "non_generic_numeric_labs": 3040104 + }, + "location":{ + "keep_only_zip": true, + "replace_postcode_with_LSOA": true + }, + "mapping_effective_date": 19698, + "name":"mock_project_settings" + } +} diff --git a/test/resources/omop/batch_3/private/PERSON_LINKS.parquet b/test/resources/omop/batch_3/private/PERSON_LINKS.parquet new file mode 100644 index 000000000..33492439b Binary files /dev/null and b/test/resources/omop/batch_3/private/PERSON_LINKS.parquet differ diff --git a/test/resources/omop/batch_3/private/PROCEDURE_OCCURRENCE_LINKS.parquet b/test/resources/omop/batch_3/private/PROCEDURE_OCCURRENCE_LINKS.parquet new file mode 100644 index 000000000..9f1b2232c Binary files /dev/null and b/test/resources/omop/batch_3/private/PROCEDURE_OCCURRENCE_LINKS.parquet differ diff --git a/test/resources/omop/batch_3/public/PROCEDURE_OCCURRENCE.parquet b/test/resources/omop/batch_3/public/PROCEDURE_OCCURRENCE.parquet new file mode 100644 index 000000000..1763846ca Binary files /dev/null and b/test/resources/omop/batch_3/public/PROCEDURE_OCCURRENCE.parquet differ diff --git a/test/resources/omop/private/PERSON_LINKS.parquet b/test/resources/omop/private/PERSON_LINKS.parquet deleted file mode 100644 index bef07a1a2..000000000 Binary files a/test/resources/omop/private/PERSON_LINKS.parquet and /dev/null differ diff --git a/test/resources/omop/public/PROCEDURE_OCCURRENCE.parquet b/test/resources/omop/public/PROCEDURE_OCCURRENCE.parquet deleted file mode 100644 index 0b526f617..000000000 Binary files a/test/resources/omop/public/PROCEDURE_OCCURRENCE.parquet and /dev/null differ diff --git a/test/run-system-test.sh b/test/run-system-test.sh index 6422f1c2f..e34b7a62d 100755 --- a/test/run-system-test.sh +++ b/test/run-system-test.sh @@ -31,7 +31,10 @@ docker compose --env-file .env -p system-test up --wait -d --build --remove-orph pip install -e "${PACKAGE_DIR}/pixl_core" && pip install -e "${PACKAGE_DIR}/cli" pip install pydicom -pixl populate "${PACKAGE_DIR}/test/resources/omop" +# set up test input data - choose the two "good" batches +INPUT_TEMP_DIR=$(mktemp -d) +cp -a "${PACKAGE_DIR}/test/resources/omop/batch_"[12] "$INPUT_TEMP_DIR" +pixl populate "$INPUT_TEMP_DIR" pixl start # need to wait until the DICOM image is "stable" so poll for 2 minutes to check @@ -39,12 +42,11 @@ pixl start ./scripts/check_entry_in_pixl_anon.sh # test export and upload -pixl extract-radiology-reports "${PACKAGE_DIR}/test/resources/omop" +pixl extract-radiology-reports "$INPUT_TEMP_DIR" ./scripts/check_radiology_parquet.py \ ../exports/test-extract-uclh-omop-cdm/latest/radiology/radiology.parquet ./scripts/check_ftps_upload.py - ls -laR ../exports/ docker exec system-test-ehr-api-1 rm -r /run/exports/test-extract-uclh-omop-cdm/