Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ehr-api becomes export-api; losing and gaining some features #370

Merged
merged 59 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
ed6edb9
Rename extract-radiology-reports CLI command to export-patient-data t…
jeremyestein Apr 11, 2024
f2f867a
Another rename
jeremyestein Apr 11, 2024
146f513
Move most of the code in the orthanc-anon plugin into
jeremyestein Apr 11, 2024
42a8b88
Check HTTP return code in dicom-web upload (not used yet)
jeremyestein Apr 11, 2024
9fc091f
Use fastapi properly and fix some typing errors
jeremyestein Apr 11, 2024
5342333
Only FTP code should have been moved
jeremyestein Apr 12, 2024
410b718
Keep main tidy
jeremyestein Apr 12, 2024
c3f8cbd
Remove code relating to querying Emap for radiology reports
jeremyestein Apr 12, 2024
36ff768
Merge branch 'main' into jeremy/export-api
jeremyestein Apr 12, 2024
1f1cde0
Delete large amount of code relating to querying Emap Star and Cogstack
jeremyestein Apr 15, 2024
224a04e
Delete fake emap star
jeremyestein Apr 15, 2024
f0b466f
Rename ehr api to export api (except config env vars)
jeremyestein Apr 15, 2024
f6ddb87
Remove ehr rabbitmq queue
jeremyestein Apr 15, 2024
762bd5e
Merge branch 'main' into jeremy/export-api
jeremyestein Apr 15, 2024
9e95f9b
Remove pixl az-copy-ehr command and related code
jeremyestein Apr 15, 2024
041842b
docs fix
jeremyestein Apr 15, 2024
eee2541
Merge branch 'main' into jeremy/export-api
jeremyestein Apr 19, 2024
3451546
Merge branch 'main' into jeremy/export-api
jeremyestein Apr 19, 2024
347f25c
Merge branch 'main' into jeremy/export-api
jeremyestein Apr 22, 2024
0f04488
First go at making the radiology linker table. The DB lookup is failing
jeremyestein Apr 24, 2024
02ffaa3
Use CliRunner so can debug in IDE more easily
jeremyestein Apr 24, 2024
84c0677
Clarify raw and slugified names
jeremyestein Apr 24, 2024
3b4de7c
Tidy logging
jeremyestein Apr 24, 2024
138c45d
Fix mypy error
jeremyestein Apr 24, 2024
b985e40
Rename
jeremyestein Apr 24, 2024
7962df2
Refactor the test a bit
jeremyestein Apr 24, 2024
666d604
Add test to detect missing radiology linker table, and a couple of
jeremyestein Apr 25, 2024
961cc9a
Use CliRunner for better debugging
jeremyestein Apr 25, 2024
d08cb5e
CliRunner invoke does not throw on a failed command - this needs to be
jeremyestein Apr 25, 2024
426f350
Use sqlalchemy objects better so the code actually works
jeremyestein Apr 25, 2024
9c9ddab
Go back to using subprocess for CLI calls. CliRunner mixes badly with
jeremyestein Apr 25, 2024
7ae0cb3
Make query simpler, and fix the detached instance error by removing
jeremyestein Apr 26, 2024
f807daa
Write radiology linker table
jeremyestein Apr 26, 2024
2d24d46
Fix linker table system test
jeremyestein Apr 26, 2024
220e317
Merge commit '7f722e8' into jeremy/export-api
jeremyestein Apr 26, 2024
ea15b20
Merge commit '2f5b307' into jeremy/export-api
jeremyestein Apr 26, 2024
7543db0
Merge branch 'main' into jeremy/export-api
jeremyestein Apr 26, 2024
1327370
Fix core test
jeremyestein Apr 26, 2024
1ed30d1
Fix mypy errors
jeremyestein Apr 26, 2024
09d379d
Remove reminder to test DB query, but add some data to the fake data
jeremyestein Apr 26, 2024
cce7d3b
Tidy docs
jeremyestein Apr 26, 2024
27a56f8
Remove redundant assert
jeremyestein Apr 26, 2024
f873aad
Reinstate deleted comment
jeremyestein Apr 26, 2024
ae53905
Reinstate deleted check for radiology parquet file to check for linker
jeremyestein Apr 26, 2024
e79f85e
Some docs changes were missed
jeremyestein Apr 26, 2024
8ff11e1
Rename or delete _EHR_ environment variables and fix various docs
jeremyestein Apr 26, 2024
41b199b
Fix (?) `test_dicomweb_upload` in sytem test
milanmlft Apr 26, 2024
244c2d0
Rework dicomweb
stefpiatek Apr 26, 2024
165f700
Update dicomweb tests after rework
milanmlft Apr 26, 2024
1bc0a38
Remove bool cast as per review
jeremyestein Apr 29, 2024
f8a544f
Move last remaining bits to loguru. Not sure if the loguru config is
jeremyestein Apr 29, 2024
ca82df2
Configure loguru at the app entry point
jeremyestein Apr 29, 2024
211612d
Make timeout a CLI parameter
jeremyestein Apr 29, 2024
84be9af
Revert "Fix (?) `test_dicomweb_upload` in sytem test"
jeremyestein Apr 29, 2024
86ae3ab
Revert "Rework dicomweb"
jeremyestein Apr 29, 2024
e0c220e
Revert "Update dicomweb tests after rework"
jeremyestein Apr 29, 2024
4de926e
Update docs/file_types/parquet_files.md
jeremyestein Apr 30, 2024
50a8dc9
Remove confusing comment
jeremyestein Apr 30, 2024
6a997b8
Reinstate comment
jeremyestein Apr 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,6 @@ AZ_DICOM_ENDPOINT_CLIENT_ID=
AZ_DICOM_ENDPOINT_CLIENT_SECRET=
AZ_DICOM_ENDPOINT_TENANT_ID=

# EHR extraction API
PIXL_EHR_API_AZ_STORAGE_ACCOUNT_NAME=
PIXL_EHR_API_AZ_STORAGE_CONTAINER_NAME=
PIXL_EHR_COGSTACK_REDACT_URL=

# RABBIT MQ queue. UI available at localhost:$RABBITMQ_ADMIN_PORT
RABBITMQ_HOST=localhost
RABBITMQ_USERNAME=
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ jobs:
run: |
pytest

ehr-api-tests:
export-api-tests:
runs-on: ubuntu-22.04
timeout-minutes: 30
steps:
Expand All @@ -155,10 +155,10 @@ jobs:

- name: Install Python dependencies
run: |
pip install pytest-pixl/ pixl_core/ pixl_ehr/[test]
pip install pytest-pixl/ pixl_core/ pixl_export/[test]

- name: Run tests
working-directory: pixl_ehr/tests
working-directory: pixl_export/tests
run: |
pytest

Expand Down Expand Up @@ -254,10 +254,10 @@ jobs:
./run-system-test.sh
echo FINISHED SYSTEM TEST SCRIPT

- name: Dump ehr-api docker logs for debugging
- name: Dump export-api docker logs for debugging
if: ${{ failure() }}
run: |
docker logs -t system-test-ehr-api-1 2>&1
docker logs -t system-test-export-api-1 2>&1

- name: Dump orthanc-anon docker logs for debugging
if: ${{ failure() }}
Expand Down
10 changes: 3 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Provides helper functions for de-identifying DICOM data

RDBMS which stores DICOM metadata, application data and anonymised patient record data.

### [Electronic Health Record Extractor](./pixl_ehr/README.md)
### [Electronic Health Record Extractor](pixl_export/README.md)

HTTP API to process messages from the `ehr` queue and populate raw and anon tables in the PIXL postgres instance.

Expand Down Expand Up @@ -122,11 +122,6 @@ The maximum storage size of the `orthanc-raw` instance can be configured through
the specified value (in MB). When the storage is full [Orthanc will automatically recycle older
studies in favour of new ones](https://orthanc.uclouvain.be/book/faq/features.html#id8).

#### CogStack URL

For the deidentification of the EHR extracts, we rely on an instance running the
[CogStack API](https://cogstack.org/) with a `/redact` endpoint. The URL of this instance should be
set in `.env` as `COGSTACK_REDACT_URL`.

### 3. Configure a new project

Expand Down Expand Up @@ -284,7 +279,8 @@ EXTRACT_DIR/public /*.parquet

### PIXL Export dir (PIXL intermediate)

The directory where PIXL will copy the public OMOP extract files and radiology reports to.
The directory where PIXL will copy the public OMOP extract files (which now contain
the radiology reports) to.
These files will subsequently be uploaded to the `parquet` destination specified in the
[project config](#3-configure-a-new-project).

Expand Down
13 changes: 5 additions & 8 deletions cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pip install -e ../pixl_core/ -e .

## Usage

> **Note** The `rabbitmq`, `ehr-api` and `imaging-api` services must be started prior to using the CLI
> **Note** The `rabbitmq`, `export-api` and `imaging-api` services must be started prior to using the CLI
> This is typically done by spinning up the necessary Docker containers through `docker compose`.
> For convenience, we provide the [`bin/pixldc`](../bin/pixldc) script to spin up the relevant
> services in production.
Expand Down Expand Up @@ -69,7 +69,7 @@ where the `*_RATE` variables set the default querying rate for the message queue

### Running the pipeline

Populate queue for Imaging and EHR extraction
Populate queue for Imaging

```bash
pixl populate </path/to/parquet_dir>
Expand All @@ -92,15 +92,13 @@ customisation of the rate per queue is required or a queue should not be started
then supply the argument `--no-start` and use `pixl start...` to launch
processing.

Once the messages have been processed, the radiology reports can be extracted and exported to a
`parquet file` using
Once the messages have been processed, the OMOP extracts (including radiology reports) can be
exported to a `parquet file` using

```sh
pixl extract-radiology-reports </path/to/parquet_dir>
pixl export-patient-data </path/to/parquet_dir>
```

This will also upload the radiology reports to the destination specified in the project config.

Stop Imaging and EHR database extraction

```bash
Expand All @@ -123,7 +121,6 @@ Options:
--help Show this message and exit.

Commands:
az-copy-ehr Copy the EHR data to azure
extract-radiology-reports Export processed radiology reports to...
kill Stop all the PIXL services
populate Populate a (set of) queue(s) from a parquet...
Expand Down
2 changes: 1 addition & 1 deletion cli/src/pixl_cli/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def base_url(self) -> str:


API_CONFIGS = {
"ehr_api": APIConfig(
"export_api": APIConfig(
host=config("PIXL_EHR_API_HOST"),
port=int(config("PIXL_EHR_API_PORT")),
default_rate=float(config("PIXL_EHR_API_RATE", default=1)),
jeremyestein marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
12 changes: 12 additions & 0 deletions cli/src/pixl_cli/_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Interaction with the PIXL database."""

from typing import cast

from core.db.models import Extract, Image
from core.patient_queue.message import Message
from sqlalchemy import URL, create_engine
Expand Down Expand Up @@ -110,3 +112,13 @@ def _add_new_image_to_session(extract: Extract, message: Message, session: Sessi
)
session.add(new_image)
return new_image


def images_for_project(project_slug: str) -> list[Image]:
"""Given a project, get all images in the DB for that project."""
PixlSession = sessionmaker(engine)
with PixlSession() as session:
return cast(
list[Image],
session.query(Image).join(Extract).filter(Extract.slug == project_slug).all(),
)
58 changes: 40 additions & 18 deletions cli/src/pixl_cli/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
import json
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING

import pandas as pd
from core.exports import ParquetExport
from core.patient_queue.message import Message
from loguru import logger

if TYPE_CHECKING:
from core.db.models import Image

# The export root dir from the point of view of the docker host (which is where the CLI runs)
# For the view from inside, see pixl_ehr/main.py: EHR_EXPORT_ROOT_DIR
# For the view from inside, see pixl_export/main.py: EHR_EXPORT_ROOT_DIR
HOST_EXPORT_ROOT_DIR = Path(__file__).parents[3] / "projects" / "exports"


Expand All @@ -43,8 +47,8 @@ def project_info(resources_path: Path) -> tuple[str, datetime]:

def copy_parquet_return_logfile_fields(resources_path: Path) -> tuple[str, datetime]:
"""Copy public parquet file to extracts directory, and return fields from logfile"""
project_name, extract_generated_timestamp = project_info(resources_path)
extract = ParquetExport(project_name, extract_generated_timestamp, HOST_EXPORT_ROOT_DIR)
project_name_raw, extract_generated_timestamp = project_info(resources_path)
extract = ParquetExport(project_name_raw, extract_generated_timestamp, HOST_EXPORT_ROOT_DIR)
project_name_slug = extract.copy_to_exports(resources_path)
return project_name_slug, extract_generated_timestamp

Expand Down Expand Up @@ -117,26 +121,14 @@ def messages_from_parquet(
private_dir = dir_path / "private"

cohort_data = _check_and_parse_parquet(private_dir, public_dir)

map_column_to_message_params = {
"mrn": "PrimaryMrn",
"accession_number": "AccessionNumber",
"study_date": "procedure_date",
"procedure_occurrence_id": "procedure_occurrence_id",
}

_raise_if_column_names_not_found(cohort_data, list(map_column_to_message_params.values()))
cohort_data_mapped = _map_columns(cohort_data)

messages = []

for _, row in cohort_data.iterrows():
message_params = {
param: row[column] for param, column in map_column_to_message_params.items()
}
for _, row in cohort_data_mapped.iterrows():
message = Message(
project_name=project_name,
extract_generated_timestamp=extract_generated_timestamp,
**message_params,
**{column: row[column] for column in MAP_PARQUET_TO_MESSAGE_KEYS.values()},
)
messages.append(message)

Expand All @@ -148,6 +140,19 @@ def messages_from_parquet(
return messages


MAP_PARQUET_TO_MESSAGE_KEYS = {
"PrimaryMrn": "mrn",
"AccessionNumber": "accession_number",
"procedure_date": "study_date",
"procedure_occurrence_id": "procedure_occurrence_id",
}


def _map_columns(input_df: pd.DataFrame) -> pd.DataFrame:
_raise_if_column_names_not_found(input_df, list(MAP_PARQUET_TO_MESSAGE_KEYS.keys()))
return input_df.rename(MAP_PARQUET_TO_MESSAGE_KEYS, axis=1)


def _check_and_parse_parquet(private_dir: Path, public_dir: Path) -> pd.DataFrame:
for d in [public_dir, private_dir]:
if not d.is_dir():
Expand All @@ -167,6 +172,23 @@ def _check_and_parse_parquet(private_dir: Path, public_dir: Path) -> pd.DataFram
return people_procedures_accessions[~people_procedures_accessions["AccessionNumber"].isna()]


def make_radiology_linker_table(parquet_dir: Path, images: list[Image]) -> pd.DataFrame:
"""
Make a table linking the OMOP procedure_occurrence_id to the hashed image/study ID.
:param parquet_dir: location of OMOP extract
(this gives us: procedure_occurrence_id <-> mrn+accession mapping)
:param images: the images already processed by PIXL, from the DB
(this gives us: mrn+accession <-> hashed ID)
"""
public_dir = parquet_dir / "public"
private_dir = parquet_dir / "private"
people_procedures_accessions = _map_columns(_check_and_parse_parquet(private_dir, public_dir))

images_df = pd.DataFrame.from_records([vars(im) for im in images])
merged = people_procedures_accessions.merge(images_df, on=("mrn", "accession_number"))
return merged[["procedure_occurrence_id", "hashed_identifier"]]


def _raise_if_column_names_not_found(
cohort_data: pd.DataFrame, expected_col_names: list[str]
) -> None:
Expand Down
46 changes: 21 additions & 25 deletions cli/src/pixl_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@

import click
import requests
from core.exports import ParquetExport
from core.patient_queue.producer import PixlProducer
from decouple import RepositoryEnv, UndefinedValueError, config
from loguru import logger

from pixl_cli._config import SERVICE_SETTINGS, api_config_for_queue
from pixl_cli._database import filter_exported_or_add_to_db
from pixl_cli._database import filter_exported_or_add_to_db, images_for_project
from pixl_cli._io import (
HOST_EXPORT_ROOT_DIR,
copy_parquet_return_logfile_fields,
make_radiology_linker_table,
messages_from_csv,
messages_from_parquet,
project_info,
Expand Down Expand Up @@ -82,7 +85,7 @@ def check_env(*, error: bool, sample_env_file: click.Path) -> None:
@cli.command()
@click.option(
"--queues",
default="imaging,ehr",
default="imaging",
show_default=True,
help="Comma seperated list of queues to populate with messages generated from the "
"input file(s)",
Expand Down Expand Up @@ -146,20 +149,25 @@ def populate(
@click.argument(
"parquet-dir", required=True, type=click.Path(path_type=Path, exists=True, file_okay=False)
)
def extract_radiology_reports(parquet_dir: Path) -> None:
def export_patient_data(parquet_dir: Path) -> None:
"""
Export processed radiology reports to parquet file.

PARQUET_DIR: Directory containing the extract_summary.json
log file defining which extract to export radiology reports for.
PARQUET_DIR: Directory containing the extract_summary.json log file
defining which extract to export patient data for. (not a CSV)
jeremyestein marked this conversation as resolved.
Show resolved Hide resolved
"""
project_name, omop_es_datetime = project_info(parquet_dir)
project_name_raw, omop_es_datetime = project_info(parquet_dir)
export = ParquetExport(project_name_raw, omop_es_datetime, HOST_EXPORT_ROOT_DIR)

# Call the EHR API
api_config = api_config_for_queue("ehr")
images = images_for_project(export.project_slug)
linker_data = make_radiology_linker_table(parquet_dir, images)
export.export_radiology_linker(linker_data)

# Call the Export API
api_config = api_config_for_queue("export")
response = requests.post(
url=f"{api_config.base_url}/export-patient-data",
json={"project_name": project_name, "extract_datetime": omop_es_datetime.isoformat()},
json={"project_name": project_name_raw, "extract_datetime": omop_es_datetime.isoformat()},
timeout=300,
jeremyestein marked this conversation as resolved.
Show resolved Hide resolved
)

Expand All @@ -175,7 +183,7 @@ def extract_radiology_reports(parquet_dir: Path) -> None:
@cli.command()
@click.option(
"--queues",
default="ehr,imaging",
default="imaging",
show_default=True,
help="Comma seperated list of queues to start consuming from",
)
Expand All @@ -197,7 +205,7 @@ def start(queues: str, rate: Optional[float]) -> None:
@cli.command()
@click.option(
"--queues",
default="ehr,imaging",
default="imaging",
show_default=True,
help="Comma seperated list of queues to update the consume rate of",
)
Expand Down Expand Up @@ -250,7 +258,7 @@ def _update_extract_rate(queue_name: str, rate: Optional[float]) -> None:
@cli.command()
@click.option(
"--queues",
default="ehr,imaging",
default="imaging",
show_default=True,
help="Comma seperated list of queues to consume messages from",
)
Expand Down Expand Up @@ -286,7 +294,7 @@ def kill() -> None:
@cli.command()
@click.option(
"--queues",
default="ehr,imaging",
default="imaging",
show_default=True,
help="Comma seperated list of queues to consume messages from",
)
Expand All @@ -296,18 +304,6 @@ def status(queues: str) -> None:
logger.info(f"[{queue:^10s}] refresh rate = ", _get_extract_rate(queue))


@cli.command()
def az_copy_ehr() -> None:
"""Copy the EHR data to azure"""
api_config = api_config_for_queue("ehr")
response = requests.get(url=f"{api_config.base_url}/az-copy-current", timeout=10)

success_code = 200
if response.status_code != success_code:
msg = f"Failed to run az copy due to: {response.text}"
raise RuntimeError(msg)


def _get_extract_rate(queue_name: str) -> str:
"""
Get the extraction rate in items per second from a queue
Expand Down
Loading