Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for OMOP ES batches #274

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a40860d
Firstly refactor tests so that each test can generate the correct bat…
jeremyestein Jan 31, 2024
cc4a3ea
Merge branch 'main' into jeremy/omop-batch-support
jeremyestein Jan 31, 2024
0a39b2f
Temporarily point to correct single batch directory. Will change this
jeremyestein Jan 31, 2024
7c628c7
Split procedure occurrence test data between two batches. Because one
jeremyestein Feb 1, 2024
e2d0917
Dodgy batch that doesn't match the project name + timestamp of the
jeremyestein Feb 1, 2024
74c1db1
Update tests, implementation not quite there yet
jeremyestein Jan 31, 2024
b9ce649
Need to write to batch dir on export, too
jeremyestein Feb 1, 2024
f55fb92
Reading and copying extracts must be done on a batch-by-batch basis.
jeremyestein Feb 2, 2024
c68d1f3
Set up the system test so it has all the original data, which is now
jeremyestein Feb 2, 2024
e7dcd35
Merge branch 'main' into jeremy/omop-batch-support
jeremyestein Feb 2, 2024
0362496
Explain what the different batches are for
jeremyestein Feb 2, 2024
6fba6c5
Minor typing error
jeremyestein Feb 2, 2024
0d15136
Increase test coverage slightly; not sure if worth it.
jeremyestein Feb 2, 2024
d253f34
Review suggestions: log batch details, sort globbed batch dirs.
jeremyestein Feb 5, 2024
6cff5a2
Document the single and multi batch extracts that are accepted by the…
jeremyestein Feb 5, 2024
2a5e81e
Merge branch 'main' into jeremy/omop-batch-support
jeremyestein Feb 5, 2024
21db43d
Check that truncated parquet files stop the process.
jeremyestein Feb 5, 2024
c83e7d1
Merge branch 'main' into jeremy/omop-batch-support
jeremyestein Feb 5, 2024
9cb88d1
Fix up merge, new code in main needs to be OMOP ES batch-aware.
jeremyestein Feb 5, 2024
c9ccbbf
Merge branch 'main' into jeremy/omop-batch-support
jeremyestein Feb 6, 2024
2b3a33b
Fix test to be batch aware. It now (correctly) fails.
jeremyestein Feb 7, 2024
99c9d58
Capture docker output for easier debugging
jeremyestein Feb 7, 2024
e35f76f
Merge branch 'main' into jeremy/omop-batch-support
jeremyestein Feb 7, 2024
fe9cc9d
Merge branch 'main' into jeremy/omop-batch-support
jeremyestein Feb 7, 2024
f871583
Missed a dependency
jeremyestein Feb 7, 2024
421f91f
FTP upload everything as found. Document the files present at each stage
jeremyestein Feb 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,42 @@ PIXL data extracts include the below assumptions

- (MRN, Accession number) is unique identifier for a report/DICOM study pair
- Patients have a single _relevant_ MRN



## File journey overview
Files that are present at each step of the pipeline.

### Resources in source repo (for test only)
For building test versions of the OMOP ES extract dir.
```
test/resources/omop/batch_1/...
....................batch_2/...
....................batch_3/public /*.parquet
............................private/*.parquet
............................extract_summary.json
```

### OMOP ES extract dir (input to PIXL)
As passed to CLI.
Square brackets (eg. `[batch_1]`) denote optional intermediate directory.
```
EXTRACT_DIR/[batch_1]/...
.........../[batch_2]/public /*.parquet
......................private/*.parquet
......................extract_summary.json
```

### PIXL Export dir (PIXL intermediate)
```
EXPORT_ROOT/PROJECT_SLUG/all_extracts/EXTRACT_DATETIME/radiology/radiology.parquet
....................................................../omop/public/[batch_1]/*.parquet
...................................................................[batch_2]/*.parquet
```

### FTP server
```
FTPROOT/PROJECT_SLUG/EXTRACT_DATETIME/parquet/radiology/radiology.parquet
..............................................omop/public/[batch_1]/*.parquet
..........................................................[batch_2]/*.parquet
```
39 changes: 30 additions & 9 deletions cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,42 @@ pixl --help
Populate queue for Imaging and EHR extraction

```bash
pixl populate </path/to/parquet_dir>
pixl populate PARQUET_DIR
```

where `parquet_dir` contains at least the following files:
where `PARQUET_DIR` has one of the two following setups, which the CLI will automatically detect:

* In single batch mode:
```sh
parquet_dir
├── extract_summary.json
├── private
│ ├── PERSON_LINKS.parquet
│ └── PROCEDURE_OCCURRENCE_LINKS.parquet
└── public
└── PROCEDURE_OCCURRENCE.parquet
└── PARQUET_DIR
├── extract_summary.json
├── private
│ ├── PERSON_LINKS.parquet
│ └── PROCEDURE_OCCURRENCE_LINKS.parquet
└── public
└── PROCEDURE_OCCURRENCE.parquet
```

* In multi-batch configuration:
```sh
└── PARQUET_DIR
├── batch_1
│ ├── extract_summary.json
│ ├── private
│ │ ├── PERSON_LINKS.parquet
│ │ └── PROCEDURE_OCCURRENCE_LINKS.parquet
│ └── public
│ └── PROCEDURE_OCCURRENCE.parquet
└── batch_2
├── extract_summary.json
├── private
│ ├── PERSON_LINKS.parquet
│ └── PROCEDURE_OCCURRENCE_LINKS.parquet
└── public
└── PROCEDURE_OCCURRENCE.parquet
```


Start the imaging extraction

```bash
Expand Down
73 changes: 66 additions & 7 deletions cli/src/pixl_cli/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,79 @@ def messages_from_state_file(filepath: Path) -> list[Message]:
return [deserialise(line) for line in filepath.open().readlines() if string_is_non_empty(line)]


def config_from_log_file(parquet_path: Path) -> tuple[str, datetime]:
log_file = parquet_path / "extract_summary.json"
def determine_batch_structure(extract_path: Path) -> tuple[str, datetime, list[Path]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe parse_batch_structure?

"""
Takes the top level extract directory and determines if it's a single or multi batch setup,
and returns the resultant metadata.
:param extract_path: the top level path that may contain the single batch or one or
more directories containing batches.
:returns: ( slugified project name of all the batches,
slugified timestamp of all the batches,
list of directories containing batches
- can be just the top level directory in the case of a single batch
)
:raises RuntimeError: if config files are non-existent or contradictory
"""
log_filename = "extract_summary.json"
single_batch_logfile = extract_path / log_filename
if single_batch_logfile.exists():
project_name, omop_es_timestamp = _config_from_log_file(single_batch_logfile)
return project_name, omop_es_timestamp, [extract_path]

# should it really be 'extract_*'?
# The order doesn't matter functionally, but sort here so failures are more repeatable.
batch_dirs = sorted(extract_path.glob("batch_*"))
extract_ids = [_config_from_log_file(log_file / log_filename) for log_file in batch_dirs]
# There should be one or more log files, all with the same identifiers
if not extract_ids:
err = f"No batched or unbatched log files found in {extract_path}"
raise RuntimeError(err)

distinct_extract_ids = set(extract_ids)
jeremyestein marked this conversation as resolved.
Show resolved Hide resolved
if len(distinct_extract_ids) != 1:
err = (
f"Found {len(batch_dirs)} log files with different IDs: "
f"Batch dirs: {batch_dirs}, IDs: {extract_ids}"
)
raise RuntimeError(err)
jeremyestein marked this conversation as resolved.
Show resolved Hide resolved

logger.info(f"Batches found: ({len(batch_dirs)}) {batch_dirs}")
project_name, omop_es_timestamp = distinct_extract_ids.pop()
return project_name, omop_es_timestamp, batch_dirs


def _config_from_log_file(log_file: Path) -> tuple[str, datetime]:
"""
Load a config file from the given file. This method has no knowledge of
multi- vs single- batch so should not be called directly.
"""
logs = json.load(log_file.open())
project_name = logs["settings"]["cdm_source_name"]
omop_es_timestamp = datetime.fromisoformat(logs["datetime"])
return project_name, omop_es_timestamp


def copy_parquet_return_logfile_fields(parquet_path: Path) -> tuple[str, datetime]:
"""Copy public parquet file to extracts directory, and return fields from logfile"""
project_name, omop_es_timestamp = config_from_log_file(parquet_path)
def copy_parquet_return_logfile_fields(extract_path: Path) -> tuple[str, datetime, list[Path]]:
"""
Copy public parquet file to extracts directory, and return fields from logfile
:param extract_path: top-level dir for the OMOP ES extract
(either a single or multi batch extract)
:returns: ( slugified project name of all the batches,
slugified timestamp of all the batches,
list of directories containing batches
- can be just the top level directory in the case of a single batch
)
:raises RuntimeError: if no log file can be found, or there is more than one batch and
project names or timestamps don't match.
"""
project_name, omop_es_timestamp, batch_dirs = determine_batch_structure(extract_path)
extract = ParquetExport(project_name, omop_es_timestamp, HOST_EXPORT_ROOT_DIR)
project_name_slug = extract.copy_to_exports(parquet_path)
return project_name_slug, omop_es_timestamp
for batch in batch_dirs:
batch_subdir = batch.relative_to(extract_path)
logger.info(f"Copying extract files from batch at {batch}...")
extract.copy_to_exports(batch, batch_subdir)
logger.info(f"Done copying extract files from batch at {batch}")
return extract.project_slug, omop_es_timestamp, batch_dirs


def messages_from_parquet(
Expand Down
12 changes: 7 additions & 5 deletions cli/src/pixl_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
from pixl_cli._config import cli_config
from pixl_cli._database import filter_exported_or_add_to_db
from pixl_cli._io import (
config_from_log_file,
copy_parquet_return_logfile_fields,
determine_batch_structure,
messages_from_parquet,
messages_from_state_file,
)
Expand Down Expand Up @@ -80,8 +80,10 @@ def populate(parquet_dir: Path, *, restart: bool, queues: str) -> None:
└── extract_summary.json
"""
logger.info(f"Populating queue(s) {queues} from {parquet_dir}")
project_name, omop_es_datetime = copy_parquet_return_logfile_fields(parquet_dir)
messages = messages_from_parquet(parquet_dir, project_name, omop_es_datetime)
project_name, omop_es_datetime, batch_dirs = copy_parquet_return_logfile_fields(parquet_dir)
messages = []
for batch in batch_dirs:
messages.extend(messages_from_parquet(batch, project_name, omop_es_datetime))

for queue in queues.split(","):
state_filepath = state_filepath_for_queue(queue)
Expand Down Expand Up @@ -113,13 +115,13 @@ def extract_radiology_reports(parquet_dir: Path) -> None:
PARQUET_DIR: Directory containing the extract_summary.json
log file defining which extract to export radiology reports for.
"""
project_name, omop_es_datetime = config_from_log_file(parquet_dir)
project_name, omop_es_timestamp, _batch_dirs = determine_batch_structure(parquet_dir)

# Call the EHR API
api_config = api_config_for_queue("ehr")
response = requests.post(
url=f"{api_config.base_url}/export-patient-data",
json={"project_name": project_name, "extract_datetime": omop_es_datetime.isoformat()},
json={"project_name": project_name, "extract_datetime": omop_es_timestamp.isoformat()},
timeout=10,
)

Expand Down
38 changes: 36 additions & 2 deletions cli/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
from __future__ import annotations

import pathlib
import shutil
from typing import TYPE_CHECKING

import pytest
from core.db.models import Base, Extract, Image
from sqlalchemy import Engine, create_engine
from sqlalchemy.orm import Session, sessionmaker

if TYPE_CHECKING:
from collections.abc import Callable


@pytest.fixture(autouse=True)
def export_dir(tmp_path_factory: pytest.TempPathFactory) -> pathlib.Path:
Expand All @@ -30,8 +35,37 @@ def export_dir(tmp_path_factory: pytest.TempPathFactory) -> pathlib.Path:

@pytest.fixture()
def resources() -> pathlib.Path:
"""Test resources directory path."""
return pathlib.Path(__file__).parent / "resources"
"""Top-level test resources directory path."""
return pathlib.Path(__file__).parents[2] / "test" / "resources"


@pytest.fixture()
def omop_es_batch_generator(resources, tmp_path_factory) -> Callable[..., pathlib.Path]:
"""
return a callable which returns, by default, a path to (a copy of) the
resources/omop/batch_1/ directory, as if it were a single batch.
You can also set up any subset of the resources/omop/batch_* directories to be present
in the returned directory. Useful for testing different setups without having a load of
copied files in the resources/omop directory.
"""
omop_batch_root = resources / "omop"
# keep separate from a test that might want to use tmp_path
tmp = tmp_path_factory.mktemp("copied_omop_es_input")

def inner_gen(batches=None, *, single_batch: bool = True) -> pathlib.Path:
if batches is None:
batches = ["batch_1"]
if single_batch:
assert len(batches) == 1
# the root tmp dir will already exist; we are effectively replacing it
shutil.copytree(omop_batch_root / batches[0], tmp, dirs_exist_ok=True)
else:
assert batches
for b in batches:
shutil.copytree(omop_batch_root / b, tmp / b)
return tmp

return inner_gen


@pytest.fixture(scope="module")
Expand Down
Binary file not shown.
Binary file not shown.
17 changes: 9 additions & 8 deletions cli/tests/test_copy_omop.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,25 @@
from __future__ import annotations

import datetime
from pathlib import Path

import pytest
from core.exports import ParquetExport


def test_new_project_copies(resources, export_dir):
def test_new_project_copies(omop_es_batch_generator, export_dir):
"""
Given a valid export directory and hasn't been exported before
When copy to exports is run
Then the public files should be copied and symlinked to the latest export directory
"""
# ARRANGE
input_dir = resources / "omop"
input_dir = omop_es_batch_generator()
project_name = "Really great cool project"
input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
omop_files = ParquetExport(project_name, input_date, export_dir)
# ACT
omop_files.copy_to_exports(input_dir)
omop_files.copy_to_exports(input_dir, Path())
# ASSERT
output_base = omop_files.export_dir / "really-great-cool-project"

Expand All @@ -51,26 +52,26 @@ def test_new_project_copies(resources, export_dir):
assert expected_files == sorted([x.stem for x in symlinked_files])


def test_second_export(resources, export_dir):
def test_second_export(omop_es_batch_generator, export_dir):
"""
Given one export already exists for the project
When a second export with a different timestamp is run for the same project
Then there should be two export directories in the all_extracts dir,
and the symlinked dir should point to the most recently copied dir
"""
# ARRANGE
input_dir = resources / "omop"
input_dir = omop_es_batch_generator()
project_name = "Really great cool project"
first_export_datetime = datetime.datetime.fromisoformat("2020-06-10T18:00:00")

omop_files = ParquetExport(project_name, first_export_datetime, export_dir)
omop_files.copy_to_exports(input_dir)
omop_files.copy_to_exports(input_dir, Path())
second_export_datetime = datetime.datetime.fromisoformat("2020-07-10T18:00:00")

omop_files = ParquetExport(project_name, second_export_datetime, export_dir)

# ACT
omop_files.copy_to_exports(input_dir)
omop_files.copy_to_exports(input_dir, Path())

# ASSERT
output_base = omop_files.export_dir / "really-great-cool-project"
Expand All @@ -96,4 +97,4 @@ def test_project_with_no_public(resources, export_dir):
input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
omop_files = ParquetExport(project_name, input_date, export_dir)
with pytest.raises(FileNotFoundError):
omop_files.copy_to_exports(input_dir)
omop_files.copy_to_exports(input_dir, Path())
Loading
Loading