Skip to content

Commit

Permalink
Add documentation for Parquet files and export process (#280)
Browse files Browse the repository at this point in the history
* Add documentation for Parquet files and export process

* Formatting

* Move `TODO` to issue #306

* Remove PR references

* Formatting

* Move specific details to `pixl_core` docs and add links

* Update directory structure on the FTPS server

* Formatting

* Rename docs/data -> docs/file_types

* Link to `file_types` documentation

* Add directory structures to docstrings

* Update upload.py

Co-authored-by: Stef Piatek <[email protected]>

* Fix docs link

Co-authored-by: Jeremy Stein <[email protected]>

* Clarify that the radiology reports go through Cogstack

Co-authored-by: Jeremy Stein <[email protected]>

* Add note about test files

Co-authored-by: Jeremy Stein <[email protected]>

---------

Co-authored-by: Stef Piatek <[email protected]>
Co-authored-by: Jeremy Stein <[email protected]>
  • Loading branch information
3 people committed Feb 22, 2024
1 parent 582c995 commit c24e86c
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 17 deletions.
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,33 +249,39 @@ PIXL data extracts include the below assumptions
- (MRN, Accession number) is unique identifier for a report/DICOM study pair
- Patients have a single _relevant_ MRN



## File journey overview

Files that are present at each step of the pipeline.

A more detailed description of the relevant file types is available in [`docs/file_types/parquet_files.md`](./docs/file_types/parquet_files.md).

### Resources in source repo (for test only)

```
test/resources/omop/public /*.parquet
....................private/*.parquet
....................extract_summary.json
```

### OMOP ES extract dir (input to PIXL)

EXTRACT_DIR is the directory passed to `pixl populate`

```
EXTRACT_DIR/public /*.parquet
............private/*.parquet
............extract_summary.json
```

### PIXL Export dir (PIXL intermediate)

```
EXPORT_ROOT/PROJECT_SLUG/all_extracts/EXTRACT_DATETIME/radiology/radiology.parquet
....................................................../omop/public/*.parquet
```

### FTP server

```
FTPROOT/PROJECT_SLUG/EXTRACT_DATETIME/parquet/radiology/radiology.parquet
..............................................omop/public/*.parquet
Expand Down
46 changes: 46 additions & 0 deletions docs/file_types/parquet_files.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Parquet files you might encounter throughout PIXL

## OMOP-ES files

From
[OMOP-ES](https://github.com/UCLH-Foundry/the-rolling-skeleton/blob/main/docs/design/100-day-design.md#data-flow-through-components)
we receive parquet files defining the data we need to export. These input files appear as 2 groups:

1. **Public** parquet files: have had identifiers removed and replaced with a sequential ID for the
export
2. **Private** parquet files: map sequential identifiers to patient identifiers (e.g. MRNs,
Accession numbers, NHS numbers)

## Radiology reports

The PIXL pipeline generates **Radiology** parquet files, which
contain the radiology reports for the given extract. These are generated by calling the CogStack API, which returns a de-identified radiology report given a full radiology report.

The functionality for this is defined in the [EHR API](../../pixl_ehr/README.md), specifically in
[`PIXLDatabase.get_radiology_reports`](../../pixl_ehr/src/pixl_ehr/_databases.py), which queries the
PIXL database for the de-identified radiology reports of the current extract and collects them
in a single _parquet_ file together with the `image_identifier` and `procedure_occurrence_id`.

## Exporting (copying from OMOP ES)

As part of the PIXL pipeline, we copy the OMOP-ES public _parquet_ files to an export directory, to
prepare them for upload to the DSH. The exporting details are in the
[`pixl_core` documentation](../../pixl_core/README.md#omop-es-files).

## Uploading to the DSH

The final step in the journey of the _parquet_ files is to upload them to the DSH. This is
implemented and documented in [`pixl_core`](../../pixl_core/README.md#uploading-to-an-ftps-server).

## Testing

Various _parquet_ files are provided throughout the repo to enable unit and system testing:

- `cli/tests/resources/omop/` contains public and private parquet files together with an
`extract_summary.json` file to mimic the input received from OMOP-ES for the unit tests. (This directory is identical to that below and should be deleted at some point).
- `test/resources/omop/` contains public and private parquet files together with an
`extract_summary.json` file to mimic the input received from OMOP-ES for the system tests

During the system test, a `radiology.parquet` file is generated and temporarily stored in
`exports/test-extract-uclh-omop-cdm/latest/radiology/radiology.parquet` to check the successful
de-identification before the DSH upload. This file is then deleted after the test.
8 changes: 4 additions & 4 deletions docs/services/ftp-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ The [`core.upload`](../../pixl_core/src/core/upload.py) module implements functi
DICOM tags and parquet files to an **FTPS server**. This requires the following environment
variables to be set:

- `FTP_HOST`: URL to the FTPS server
- `FTP_PORT`: port on which the FTPS server is listening
- `FTP_USER_NAME`: name of user with access to the FTPS server
- `FTP_USER_PASSWORD`: password for the authorised user
- `FTP_HOST`: URL to the FTPS server
- `FTP_PORT`: port on which the FTPS server is listening
- `FTP_USER_NAME`: name of user with access to the FTPS server
- `FTP_USER_PASSWORD`: password for the authorised user

We provide mock values for these for the unit tests (see
[`./tests/conftest.py`](./tests/conftest.py)). When running in production, these should be defined
Expand Down
43 changes: 33 additions & 10 deletions pixl_core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ upstream services.

Specifically, it defines:

- The [Token buffer](#token-buffer) for rate limiting requests to the upstream services
- The [RabbitMQ queue](#patient-queue) implementation shared by the EHR and Imaging APIs
- The PIXL `postgres` internal database for storing exported images and extracts from the messages
- The [Token buffer](#token-buffer) for rate limiting requests to the upstream services
- The [RabbitMQ queue](#patient-queue) implementation shared by the EHR and Imaging APIs
- The PIXL `postgres` internal database for storing exported images and extracts from the messages
processed by the CLI driver
- The [`ParquetExport`](./src/core/exports.py) class for exporting OMOP and EMAP extracts to
- The [`ParquetExport`](./src/core/exports.py) class for exporting OMOP and EMAP extracts to
parquet files
- Handling of [uploads over FTPS](./src/core/upload.py), used to transfer images and parquet files
- Handling of [uploads over FTPS](./src/core/upload.py), used to transfer images and parquet files
to the DSH (Data Safe Haven)

## Installation
Expand Down Expand Up @@ -90,14 +90,37 @@ for convenience `latest` is a symlink to the most recent extract.

## Uploading to an FTPS server

The `core.upload` module implements functionality to upload DICOM tags and parquet files to an
The `core.upload` module implements functionality to upload DICOM images and parquet files to an
**FTPS server**. This requires the following environment variables to be set:

- `FTP_HOST`: URL to the FTPS server
- `FTP_PORT`: port on which the FTPS server is listening
- `FTP_USER_NAME`: name of user with access to the FTPS server
- `FTP_USER_PASSWORD`: password for the authorised user
- `FTP_HOST`: URL to the FTPS server
- `FTP_PORT`: port on which the FTPS server is listening
- `FTP_USER_NAME`: name of user with access to the FTPS server
- `FTP_USER_PASSWORD`: password for the authorised user

We provide mock values for these for the unit tests (see
[`./tests/conftest.py`](./tests/conftest.py)). When running in production, these should be defined
in the `.env` file (see [the example](../.env.sample)).

When an extract is ready to be published to the DSH, the PIXL pipeline will upload the **Public**
and **Radiology** [_parquet_ files](../docs/data/parquet_files.md) to the `<project-slug>` directory
where the DICOM datasets are stored (see the directory structure below). The uploading is controlled
by `upload_parquet_files` in [`upload.py`](./src/core/upload.py) which takes a `ParquetExport`
object as input to define where the _parquet_ files are located. `upload_parquet_files` is called
by the `export-patient-data` API endpoint defined in the
[EHR API](../pixl_ehr/src/pixl_ehr/main.py), which in turn is called by the `extract_radiology_reports` command in the [PIXL CLI](../cli/README.md).

Once the parquet files have been uploaded to the DSH, the directory structure will look like this:

```sh
<project-slug>
├── <extract_datetime_slug>
│   └── parquet
│   ├── omop
│   │   └── public
│   │   └── PROCEDURE_OCCURRENCE.parquet
│   └── radiology
│   └── radiology.parquet
├── <pseudonymised_ID_DICOM_dataset_1>.zip
└── <pseudonymised_ID_DICOM_dataset_2>.zip
```
12 changes: 12 additions & 0 deletions pixl_core/src/core/exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ def copy_to_exports(self, input_omop_dir: pathlib.Path) -> str:
:param input_omop_dir: parent path for input omop data, with a "public" subdirectory
:raises FileNotFoundError: if there is no public subdirectory in `omop_dir`
:returns str: the project slug, so this can be registered for export to the DSH
The final directory structure will look like this:
exports
└── <project_slug>
├── all_extracts
│ └── <extract_datetime_slug>
│ ├── omop
│ │ └── public
│ │ └── PROCEDURE_OCCURRENCE.parquet
│ └── radiology
│ └── radiology.parquet
└── latest -> </symlink/to/latest/extract>
"""
public_input = input_omop_dir / "public"

Expand Down
17 changes: 16 additions & 1 deletion pixl_core/src/core/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,22 @@ def upload_dicom_image(zip_content: BinaryIO, pseudo_anon_id: str) -> None:


def upload_parquet_files(parquet_export: ParquetExport) -> None:
"""Upload parquet to FTPS under <project name>/<extract datetime>/parquet."""
"""
Upload parquet to FTPS under <project name>/<extract datetime>/parquet.
:param parquet_export: instance of the ParquetExport class
The final directory structure will look like this:
<project-slug>
├── <extract_datetime_slug>
│ └── parquet
│ ├── omop
│ │ └── public
│ │ └── PROCEDURE_OCCURRENCE.parquet
│ └── radiology
│ └── radiology.parquet
├── <pseudonymised_ID_DICOM_dataset_1>.zip
└── <pseudonymised_ID_DICOM_dataset_2>.zip
...
"""
logger.info("Starting FTPS upload of files for '%s'", parquet_export.project_slug)

source_root_dir = parquet_export.current_extract_base
Expand Down

0 comments on commit c24e86c

Please sign in to comment.