From c24e86c6519ed1576fbba3994330513a6bac3893 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 20 Feb 2024 12:50:05 +0000 Subject: [PATCH] Add documentation for Parquet files and export process (#280) * Add documentation for Parquet files and export process * Formatting * Move `TODO` to issue #306 * Remove PR references * Formatting * Move specific details to `pixl_core` docs and add links * Update directory structure on the FTPS server * Formatting * Rename docs/data -> docs/file_types * Link to `file_types` documentation * Add directory structures to docstrings * Update upload.py Co-authored-by: Stef Piatek * Fix docs link Co-authored-by: Jeremy Stein * Clarify that the radiology reports go through Cogstack Co-authored-by: Jeremy Stein * Add note about test files Co-authored-by: Jeremy Stein --------- Co-authored-by: Stef Piatek Co-authored-by: Jeremy Stein --- README.md | 10 +++++-- docs/file_types/parquet_files.md | 46 ++++++++++++++++++++++++++++++++ docs/services/ftp-server.md | 8 +++--- pixl_core/README.md | 43 ++++++++++++++++++++++------- pixl_core/src/core/exports.py | 12 +++++++++ pixl_core/src/core/upload.py | 17 +++++++++++- 6 files changed, 119 insertions(+), 17 deletions(-) create mode 100644 docs/file_types/parquet_files.md diff --git a/README.md b/README.md index 0b1f8d5d1..2b4573e9d 100644 --- a/README.md +++ b/README.md @@ -249,12 +249,14 @@ PIXL data extracts include the below assumptions - (MRN, Accession number) is unique identifier for a report/DICOM study pair - Patients have a single _relevant_ MRN - - ## File journey overview + Files that are present at each step of the pipeline. +A more detailed description of the relevant file types is available in [`docs/file_types/parquet_files.md`](./docs/file_types/parquet_files.md). + ### Resources in source repo (for test only) + ``` test/resources/omop/public /*.parquet ....................private/*.parquet @@ -262,7 +264,9 @@ test/resources/omop/public /*.parquet ``` ### OMOP ES extract dir (input to PIXL) + EXTRACT_DIR is the directory passed to `pixl populate` + ``` EXTRACT_DIR/public /*.parquet ............private/*.parquet @@ -270,12 +274,14 @@ EXTRACT_DIR/public /*.parquet ``` ### PIXL Export dir (PIXL intermediate) + ``` EXPORT_ROOT/PROJECT_SLUG/all_extracts/EXTRACT_DATETIME/radiology/radiology.parquet ....................................................../omop/public/*.parquet ``` ### FTP server + ``` FTPROOT/PROJECT_SLUG/EXTRACT_DATETIME/parquet/radiology/radiology.parquet ..............................................omop/public/*.parquet diff --git a/docs/file_types/parquet_files.md b/docs/file_types/parquet_files.md new file mode 100644 index 000000000..0502202fe --- /dev/null +++ b/docs/file_types/parquet_files.md @@ -0,0 +1,46 @@ +# Parquet files you might encounter throughout PIXL + +## OMOP-ES files + +From +[OMOP-ES](https://github.com/UCLH-Foundry/the-rolling-skeleton/blob/main/docs/design/100-day-design.md#data-flow-through-components) +we receive parquet files defining the data we need to export. These input files appear as 2 groups: + +1. **Public** parquet files: have had identifiers removed and replaced with a sequential ID for the + export +2. **Private** parquet files: map sequential identifiers to patient identifiers (e.g. MRNs, + Accession numbers, NHS numbers) + +## Radiology reports + +The PIXL pipeline generates **Radiology** parquet files, which +contain the radiology reports for the given extract. These are generated by calling the CogStack API, which returns a de-identified radiology report given a full radiology report. + +The functionality for this is defined in the [EHR API](../../pixl_ehr/README.md), specifically in +[`PIXLDatabase.get_radiology_reports`](../../pixl_ehr/src/pixl_ehr/_databases.py), which queries the +PIXL database for the de-identified radiology reports of the current extract and collects them +in a single _parquet_ file together with the `image_identifier` and `procedure_occurrence_id`. + +## Exporting (copying from OMOP ES) + +As part of the PIXL pipeline, we copy the OMOP-ES public _parquet_ files to an export directory, to +prepare them for upload to the DSH. The exporting details are in the +[`pixl_core` documentation](../../pixl_core/README.md#omop-es-files). + +## Uploading to the DSH + +The final step in the journey of the _parquet_ files is to upload them to the DSH. This is +implemented and documented in [`pixl_core`](../../pixl_core/README.md#uploading-to-an-ftps-server). + +## Testing + +Various _parquet_ files are provided throughout the repo to enable unit and system testing: + +- `cli/tests/resources/omop/` contains public and private parquet files together with an + `extract_summary.json` file to mimic the input received from OMOP-ES for the unit tests. (This directory is identical to that below and should be deleted at some point). +- `test/resources/omop/` contains public and private parquet files together with an + `extract_summary.json` file to mimic the input received from OMOP-ES for the system tests + +During the system test, a `radiology.parquet` file is generated and temporarily stored in +`exports/test-extract-uclh-omop-cdm/latest/radiology/radiology.parquet` to check the successful +de-identification before the DSH upload. This file is then deleted after the test. diff --git a/docs/services/ftp-server.md b/docs/services/ftp-server.md index 43258c32e..7564f281b 100644 --- a/docs/services/ftp-server.md +++ b/docs/services/ftp-server.md @@ -7,10 +7,10 @@ The [`core.upload`](../../pixl_core/src/core/upload.py) module implements functi DICOM tags and parquet files to an **FTPS server**. This requires the following environment variables to be set: -- `FTP_HOST`: URL to the FTPS server -- `FTP_PORT`: port on which the FTPS server is listening -- `FTP_USER_NAME`: name of user with access to the FTPS server -- `FTP_USER_PASSWORD`: password for the authorised user +- `FTP_HOST`: URL to the FTPS server +- `FTP_PORT`: port on which the FTPS server is listening +- `FTP_USER_NAME`: name of user with access to the FTPS server +- `FTP_USER_PASSWORD`: password for the authorised user We provide mock values for these for the unit tests (see [`./tests/conftest.py`](./tests/conftest.py)). When running in production, these should be defined diff --git a/pixl_core/README.md b/pixl_core/README.md index 417baaf07..cf669fe87 100644 --- a/pixl_core/README.md +++ b/pixl_core/README.md @@ -6,13 +6,13 @@ upstream services. Specifically, it defines: -- The [Token buffer](#token-buffer) for rate limiting requests to the upstream services -- The [RabbitMQ queue](#patient-queue) implementation shared by the EHR and Imaging APIs -- The PIXL `postgres` internal database for storing exported images and extracts from the messages +- The [Token buffer](#token-buffer) for rate limiting requests to the upstream services +- The [RabbitMQ queue](#patient-queue) implementation shared by the EHR and Imaging APIs +- The PIXL `postgres` internal database for storing exported images and extracts from the messages processed by the CLI driver -- The [`ParquetExport`](./src/core/exports.py) class for exporting OMOP and EMAP extracts to +- The [`ParquetExport`](./src/core/exports.py) class for exporting OMOP and EMAP extracts to parquet files -- Handling of [uploads over FTPS](./src/core/upload.py), used to transfer images and parquet files +- Handling of [uploads over FTPS](./src/core/upload.py), used to transfer images and parquet files to the DSH (Data Safe Haven) ## Installation @@ -90,14 +90,37 @@ for convenience `latest` is a symlink to the most recent extract. ## Uploading to an FTPS server -The `core.upload` module implements functionality to upload DICOM tags and parquet files to an +The `core.upload` module implements functionality to upload DICOM images and parquet files to an **FTPS server**. This requires the following environment variables to be set: -- `FTP_HOST`: URL to the FTPS server -- `FTP_PORT`: port on which the FTPS server is listening -- `FTP_USER_NAME`: name of user with access to the FTPS server -- `FTP_USER_PASSWORD`: password for the authorised user +- `FTP_HOST`: URL to the FTPS server +- `FTP_PORT`: port on which the FTPS server is listening +- `FTP_USER_NAME`: name of user with access to the FTPS server +- `FTP_USER_PASSWORD`: password for the authorised user We provide mock values for these for the unit tests (see [`./tests/conftest.py`](./tests/conftest.py)). When running in production, these should be defined in the `.env` file (see [the example](../.env.sample)). + +When an extract is ready to be published to the DSH, the PIXL pipeline will upload the **Public** +and **Radiology** [_parquet_ files](../docs/data/parquet_files.md) to the `` directory +where the DICOM datasets are stored (see the directory structure below). The uploading is controlled +by `upload_parquet_files` in [`upload.py`](./src/core/upload.py) which takes a `ParquetExport` +object as input to define where the _parquet_ files are located. `upload_parquet_files` is called +by the `export-patient-data` API endpoint defined in the +[EHR API](../pixl_ehr/src/pixl_ehr/main.py), which in turn is called by the `extract_radiology_reports` command in the [PIXL CLI](../cli/README.md). + +Once the parquet files have been uploaded to the DSH, the directory structure will look like this: + +```sh + + ├── + │   └── parquet + │   ├── omop + │   │   └── public + │   │   └── PROCEDURE_OCCURRENCE.parquet + │   └── radiology + │   └── radiology.parquet + ├── .zip + └── .zip +``` diff --git a/pixl_core/src/core/exports.py b/pixl_core/src/core/exports.py index b9a013d2d..20bcaf0b4 100644 --- a/pixl_core/src/core/exports.py +++ b/pixl_core/src/core/exports.py @@ -65,6 +65,18 @@ def copy_to_exports(self, input_omop_dir: pathlib.Path) -> str: :param input_omop_dir: parent path for input omop data, with a "public" subdirectory :raises FileNotFoundError: if there is no public subdirectory in `omop_dir` :returns str: the project slug, so this can be registered for export to the DSH + + The final directory structure will look like this: + exports + └── + ├── all_extracts + │ └── + │ ├── omop + │ │ └── public + │ │ └── PROCEDURE_OCCURRENCE.parquet + │ └── radiology + │ └── radiology.parquet + └── latest -> """ public_input = input_omop_dir / "public" diff --git a/pixl_core/src/core/upload.py b/pixl_core/src/core/upload.py index db5c08e4b..34e4e1333 100644 --- a/pixl_core/src/core/upload.py +++ b/pixl_core/src/core/upload.py @@ -90,7 +90,22 @@ def upload_dicom_image(zip_content: BinaryIO, pseudo_anon_id: str) -> None: def upload_parquet_files(parquet_export: ParquetExport) -> None: - """Upload parquet to FTPS under //parquet.""" + """ + Upload parquet to FTPS under //parquet. + :param parquet_export: instance of the ParquetExport class + The final directory structure will look like this: + + ├── + │ └── parquet + │ ├── omop + │ │ └── public + │ │ └── PROCEDURE_OCCURRENCE.parquet + │ └── radiology + │ └── radiology.parquet + ├── .zip + └── .zip + ... + """ logger.info("Starting FTPS upload of files for '%s'", parquet_export.project_slug) source_root_dir = parquet_export.current_extract_base