Skip to content

Commit

Permalink
ehr-api becomes export-api; losing and gaining some features (#370)
Browse files Browse the repository at this point in the history
* Rename extract-radiology-reports CLI command to export-patient-data to better reflect what it
now does, and to match the API endpoint name.

* Rename ehr api to export api

* Move a lot of code in the orthanc-anon plugin into
ehr-api/export-api, plus a bit of config to get it working there

* Delete large amount of code relating to querying Emap Star and Cogstack, including fake servers

* Remove ehr rabbitmq queue

* Remove pixl az-copy-ehr command and related code

* Clarify raw and slugified names

* CliRunner invoke does not throw on a failed command - this needs to be
checked manually and was obscuring errors, making debugging harder.

* Make query simpler, and fix the detached instance error by removing
the session.begin line (not completely sure why this fixes it)

* Write radiology linker table

* Fix mypy errors

* Update docs in many places

* Rename or delete _EHR_ environment variables

* Move last remaining bits to loguru.

* Make timeout a CLI parameter

---------

Co-authored-by: Milan Malfait <[email protected]>
Co-authored-by: Stef Piatek <[email protected]>
  • Loading branch information
3 people authored Apr 30, 2024
1 parent 253cc6e commit 107c267
Show file tree
Hide file tree
Showing 57 changed files with 572 additions and 1,582 deletions.
13 changes: 4 additions & 9 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ ORTHANC_ANON_DICOM_PORT=
ORTHANC_ANON_WEB_PORT=
ORTHANC_RAW_DICOM_PORT=
ORTHANC_RAW_WEB_PORT=
PIXL_EHR_API_PORT=
PIXL_EXPORT_API_PORT=
RABBITMQ_PORT=
RABBITMQ_ADMIN_PORT=
PIXL_IMAGING_API_PORT=
FTP_PORT=

# PIXL EHR API
PIXL_EHR_API_HOST=localhost
PIXL_EHR_API_RATE=1
# PIXL EXPORT API
PIXL_EXPORT_API_HOST=localhost
PIXL_EXPORT_API_RATE=1

# PIXL Imaging API
PIXL_IMAGING_API_HOST=localhost
Expand Down Expand Up @@ -83,11 +83,6 @@ AZ_DICOM_ENDPOINT_CLIENT_ID=
AZ_DICOM_ENDPOINT_CLIENT_SECRET=
AZ_DICOM_ENDPOINT_TENANT_ID=

# EHR extraction API
PIXL_EHR_API_AZ_STORAGE_ACCOUNT_NAME=
PIXL_EHR_API_AZ_STORAGE_CONTAINER_NAME=
PIXL_EHR_COGSTACK_REDACT_URL=

# RABBIT MQ queue. UI available at localhost:$RABBITMQ_ADMIN_PORT
RABBITMQ_HOST=localhost
RABBITMQ_USERNAME=
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ jobs:
run: |
pytest
ehr-api-tests:
export-api-tests:
runs-on: ubuntu-22.04
timeout-minutes: 30
steps:
Expand All @@ -155,10 +155,10 @@ jobs:

- name: Install Python dependencies
run: |
pip install pytest-pixl/ pixl_core/ pixl_ehr/[test]
pip install pytest-pixl/ pixl_core/ pixl_export/[test]
- name: Run tests
working-directory: pixl_ehr/tests
working-directory: pixl_export/tests
run: |
pytest
Expand Down Expand Up @@ -254,10 +254,10 @@ jobs:
./run-system-test.sh
echo FINISHED SYSTEM TEST SCRIPT
- name: Dump ehr-api docker logs for debugging
- name: Dump export-api docker logs for debugging
if: ${{ failure() }}
run: |
docker logs -t system-test-ehr-api-1 2>&1
docker logs -t system-test-export-api-1 2>&1
- name: Dump orthanc-anon docker logs for debugging
if: ${{ failure() }}
Expand Down
20 changes: 4 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Provides helper functions for de-identifying DICOM data

RDBMS which stores DICOM metadata, application data and anonymised patient record data.

### [Electronic Health Record Extractor](./pixl_ehr/README.md)
### [Electronic Health Record Extractor](pixl_export/README.md)

HTTP API to process messages from the `ehr` queue and populate raw and anon tables in the PIXL postgres instance.

Expand Down Expand Up @@ -106,9 +106,6 @@ You can leave them blank for other dev work.
- `PIXL_DB_`*
These are credentials for the containerised PostgreSQL service and are set in the official PostgreSQL image.
Use a strong password for `prod` deployment but the only requirement for other environments is consistency as several services interact with the database.
- `PIXL_EHR_API_AZ_`*
These credentials are used for uploading a PIXL database to Azure blob storage. They should be for a service principal that has `Storage Blob Data Contributor`
on the target storage account. The storage account must also allow network access from the PIXL host machine.

#### Ports

Expand All @@ -122,11 +119,6 @@ The maximum storage size of the `orthanc-raw` instance can be configured through
the specified value (in MB). When the storage is full [Orthanc will automatically recycle older
studies in favour of new ones](https://orthanc.uclouvain.be/book/faq/features.html#id8).

#### CogStack URL

For the deidentification of the EHR extracts, we rely on an instance running the
[CogStack API](https://cogstack.org/) with a `/redact` endpoint. The URL of this instance should be
set in `.env` as `COGSTACK_REDACT_URL`.

### 3. Configure a new project

Expand Down Expand Up @@ -244,12 +236,7 @@ and `ORTHANC_RAW_WEB_PORT` is defined in `.env`.
The number of reports and EHR can be interrogated by connecting to the PIXL
database with a database client (e.g. [DBeaver](https://dbeaver.io/)), using
the connection parameters defined in `.env`. For example, to find the number of
non-null reports
```sql
select count(*) from emap_data.ehr_anon where xray_report is not null;
```
the connection parameters defined in `.env`.
## Assumptions
Expand Down Expand Up @@ -284,7 +271,8 @@ EXTRACT_DIR/public /*.parquet
### PIXL Export dir (PIXL intermediate)
The directory where PIXL will copy the public OMOP extract files and radiology reports to.
The directory where PIXL will copy the public OMOP extract files (which now contain
the radiology reports) to.
These files will subsequently be uploaded to the `parquet` destination specified in the
[project config](#3-configure-a-new-project).
Expand Down
19 changes: 8 additions & 11 deletions cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pip install -e ../pixl_core/ -e .

## Usage

> **Note** The `rabbitmq`, `ehr-api` and `imaging-api` services must be started prior to using the CLI
> **Note** The `rabbitmq`, `export-api` and `imaging-api` services must be started prior to using the CLI
> This is typically done by spinning up the necessary Docker containers through `docker compose`.
> For convenience, we provide the [`bin/pixldc`](../bin/pixldc) script to spin up the relevant
> services in production.
Expand Down Expand Up @@ -56,9 +56,9 @@ PIXL_DB_NAME=pixl
The `rabbitmq` queues for the `ehr` and `imaging` APIs are configured by setting:

```sh
PIXL_EHR_API_HOST=localhost
PIXL_EHR_API_PORT=7006
PIXL_EHR_API_RATE=1
PIXL_EXPORT_API_HOST=localhost
PIXL_EXPORT_API_PORT=7006
PIXL_EXPORT_API_RATE=1

PIXL_IMAGING_API_HOST=localhost
PIXL_IMAGING_API_PORT=7007
Expand All @@ -69,7 +69,7 @@ where the `*_RATE` variables set the default querying rate for the message queue

### Running the pipeline

Populate queue for Imaging and EHR extraction
Populate queue for Imaging

```bash
pixl populate </path/to/parquet_dir>
Expand All @@ -92,15 +92,13 @@ customisation of the rate per queue is required or a queue should not be started
then supply the argument `--no-start` and use `pixl start...` to launch
processing.

Once the messages have been processed, the radiology reports can be extracted and exported to a
`parquet file` using
Once the messages have been processed, the OMOP extracts (including radiology reports) can be
exported to a `parquet file` using

```sh
pixl extract-radiology-reports </path/to/parquet_dir>
pixl export-patient-data </path/to/parquet_dir>
```

This will also upload the radiology reports to the destination specified in the project config.

Stop Imaging and EHR database extraction

```bash
Expand All @@ -123,7 +121,6 @@ Options:
--help Show this message and exit.

Commands:
az-copy-ehr Copy the EHR data to azure
extract-radiology-reports Export processed radiology reports to...
kill Stop all the PIXL services
populate Populate a (set of) queue(s) from a parquet...
Expand Down
8 changes: 4 additions & 4 deletions cli/src/pixl_cli/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ def base_url(self) -> str:


API_CONFIGS = {
"ehr_api": APIConfig(
host=config("PIXL_EHR_API_HOST"),
port=int(config("PIXL_EHR_API_PORT")),
default_rate=float(config("PIXL_EHR_API_RATE", default=1)),
"export_api": APIConfig(
host=config("PIXL_EXPORT_API_HOST"),
port=int(config("PIXL_EXPORT_API_PORT")),
default_rate=float(config("PIXL_EXPORT_API_RATE", default=1)),
),
"imaging_api": APIConfig(
host=config("PIXL_IMAGING_API_HOST"),
Expand Down
12 changes: 12 additions & 0 deletions cli/src/pixl_cli/_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Interaction with the PIXL database."""

from typing import cast

from core.db.models import Extract, Image
from core.patient_queue.message import Message
from sqlalchemy import URL, create_engine
Expand Down Expand Up @@ -110,3 +112,13 @@ def _add_new_image_to_session(extract: Extract, message: Message, session: Sessi
)
session.add(new_image)
return new_image


def images_for_project(project_slug: str) -> list[Image]:
"""Given a project, get all images in the DB for that project."""
PixlSession = sessionmaker(engine)
with PixlSession() as session:
return cast(
list[Image],
session.query(Image).join(Extract).filter(Extract.slug == project_slug).all(),
)
58 changes: 40 additions & 18 deletions cli/src/pixl_cli/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
import json
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING

import pandas as pd
from core.exports import ParquetExport
from core.patient_queue.message import Message
from loguru import logger

if TYPE_CHECKING:
from core.db.models import Image

# The export root dir from the point of view of the docker host (which is where the CLI runs)
# For the view from inside, see pixl_ehr/main.py: EHR_EXPORT_ROOT_DIR
# For the view from inside, see pixl_export/main.py: EXPORT_API_EXPORT_ROOT_DIR
HOST_EXPORT_ROOT_DIR = Path(__file__).parents[3] / "projects" / "exports"


Expand All @@ -43,8 +47,8 @@ def project_info(resources_path: Path) -> tuple[str, datetime]:

def copy_parquet_return_logfile_fields(resources_path: Path) -> tuple[str, datetime]:
"""Copy public parquet file to extracts directory, and return fields from logfile"""
project_name, extract_generated_timestamp = project_info(resources_path)
extract = ParquetExport(project_name, extract_generated_timestamp, HOST_EXPORT_ROOT_DIR)
project_name_raw, extract_generated_timestamp = project_info(resources_path)
extract = ParquetExport(project_name_raw, extract_generated_timestamp, HOST_EXPORT_ROOT_DIR)
project_name_slug = extract.copy_to_exports(resources_path)
return project_name_slug, extract_generated_timestamp

Expand Down Expand Up @@ -117,26 +121,14 @@ def messages_from_parquet(
private_dir = dir_path / "private"

cohort_data = _check_and_parse_parquet(private_dir, public_dir)

map_column_to_message_params = {
"mrn": "PrimaryMrn",
"accession_number": "AccessionNumber",
"study_date": "procedure_date",
"procedure_occurrence_id": "procedure_occurrence_id",
}

_raise_if_column_names_not_found(cohort_data, list(map_column_to_message_params.values()))
cohort_data_mapped = _map_columns(cohort_data)

messages = []

for _, row in cohort_data.iterrows():
message_params = {
param: row[column] for param, column in map_column_to_message_params.items()
}
for _, row in cohort_data_mapped.iterrows():
message = Message(
project_name=project_name,
extract_generated_timestamp=extract_generated_timestamp,
**message_params,
**{column: row[column] for column in MAP_PARQUET_TO_MESSAGE_KEYS.values()},
)
messages.append(message)

Expand All @@ -148,6 +140,19 @@ def messages_from_parquet(
return messages


MAP_PARQUET_TO_MESSAGE_KEYS = {
"PrimaryMrn": "mrn",
"AccessionNumber": "accession_number",
"procedure_date": "study_date",
"procedure_occurrence_id": "procedure_occurrence_id",
}


def _map_columns(input_df: pd.DataFrame) -> pd.DataFrame:
_raise_if_column_names_not_found(input_df, list(MAP_PARQUET_TO_MESSAGE_KEYS.keys()))
return input_df.rename(MAP_PARQUET_TO_MESSAGE_KEYS, axis=1)


def _check_and_parse_parquet(private_dir: Path, public_dir: Path) -> pd.DataFrame:
for d in [public_dir, private_dir]:
if not d.is_dir():
Expand All @@ -167,6 +172,23 @@ def _check_and_parse_parquet(private_dir: Path, public_dir: Path) -> pd.DataFram
return people_procedures_accessions[~people_procedures_accessions["AccessionNumber"].isna()]


def make_radiology_linker_table(parquet_dir: Path, images: list[Image]) -> pd.DataFrame:
"""
Make a table linking the OMOP procedure_occurrence_id to the hashed image/study ID.
:param parquet_dir: location of OMOP extract
(this gives us: procedure_occurrence_id <-> mrn+accession mapping)
:param images: the images already processed by PIXL, from the DB
(this gives us: mrn+accession <-> hashed ID)
"""
public_dir = parquet_dir / "public"
private_dir = parquet_dir / "private"
people_procedures_accessions = _map_columns(_check_and_parse_parquet(private_dir, public_dir))

images_df = pd.DataFrame.from_records([vars(im) for im in images])
merged = people_procedures_accessions.merge(images_df, on=("mrn", "accession_number"))
return merged[["procedure_occurrence_id", "hashed_identifier"]]


def _raise_if_column_names_not_found(
cohort_data: pd.DataFrame, expected_col_names: list[str]
) -> None:
Expand Down
Loading

0 comments on commit 107c267

Please sign in to comment.