From a40860d22e7d3c2202a5423f430797148ad65c49 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Wed, 31 Jan 2024 18:58:57 +0000 Subject: [PATCH 01/19] Firstly refactor tests so that each test can generate the correct batch setup as needed. For now, all tests are doing it the "old" way. Delete duplicated test resources. --- cli/tests/conftest.py | 38 ++++++- cli/tests/test_copy_omop.py | 8 +- cli/tests/test_messages_from_parquet.py | 8 +- .../test_queue_start_and_stop_parquet.py | 8 +- .../omop/batch_1}/extract_summary.json | 0 .../batch_1}/private/PERSON_LINKS.parquet | Bin .../PROCEDURE_OCCURRENCE_LINKS.parquet | Bin .../public/PROCEDURE_OCCURRENCE.parquet | Bin test/resources/omop/extract_summary.json | 94 ------------------ .../omop/private/PERSON_LINKS.parquet | Bin 1953 -> 0 bytes .../PROCEDURE_OCCURRENCE_LINKS.parquet | Bin 1336 -> 0 bytes .../omop/public/PROCEDURE_OCCURRENCE.parquet | Bin 5264 -> 0 bytes 12 files changed, 46 insertions(+), 110 deletions(-) rename {cli/tests/resources/omop => test/resources/omop/batch_1}/extract_summary.json (100%) rename {cli/tests/resources/omop => test/resources/omop/batch_1}/private/PERSON_LINKS.parquet (100%) rename {cli/tests/resources/omop => test/resources/omop/batch_1}/private/PROCEDURE_OCCURRENCE_LINKS.parquet (100%) rename {cli/tests/resources/omop => test/resources/omop/batch_1}/public/PROCEDURE_OCCURRENCE.parquet (100%) delete mode 100644 test/resources/omop/extract_summary.json delete mode 100644 test/resources/omop/private/PERSON_LINKS.parquet delete mode 100644 test/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet delete mode 100644 test/resources/omop/public/PROCEDURE_OCCURRENCE.parquet diff --git a/cli/tests/conftest.py b/cli/tests/conftest.py index f8d0f69d8..9ff7f8815 100644 --- a/cli/tests/conftest.py +++ b/cli/tests/conftest.py @@ -15,12 +15,17 @@ from __future__ import annotations import pathlib +import shutil +from typing import TYPE_CHECKING import pytest from core.db.models import Base, Extract, Image from sqlalchemy import Engine, create_engine from sqlalchemy.orm import Session, sessionmaker +if TYPE_CHECKING: + from collections.abc import Callable + @pytest.fixture(autouse=True) def export_dir(tmp_path_factory: pytest.TempPathFactory) -> pathlib.Path: @@ -30,8 +35,37 @@ def export_dir(tmp_path_factory: pytest.TempPathFactory) -> pathlib.Path: @pytest.fixture() def resources() -> pathlib.Path: - """Test resources directory path.""" - return pathlib.Path(__file__).parent / "resources" + """Top-level test resources directory path.""" + return pathlib.Path(__file__).parents[2] / "test" / "resources" + + +@pytest.fixture() +def omop_es_batch_generator(resources, tmp_path_factory) -> Callable[..., pathlib.Path]: + """ + return a callable which returns, by default, a path to (a copy of) the + resources/omop/batch_1/ directory, as if it were a single batch. + You can also set up any subset of the resources/omop/batch_* directories to be present + in the returned directory. Useful for testing different setups without having a load of + copied files in the resources/omop directory. + """ + omop_batch_root = resources / "omop" + # keep separate from a test that might want to use tmp_path + tmp = tmp_path_factory.mktemp("copied_omop_es_input") + + def inner_gen(batches=None, *, single_batch: bool = True) -> pathlib.Path: + if batches is None: + batches = ["batch_1"] + if single_batch: + assert len(batches) == 1 + # the root tmp dir will already exist; we are effectively replacing it + shutil.copytree(omop_batch_root / batches[0], tmp, dirs_exist_ok=True) + else: + assert batches + for b in batches: + shutil.copytree(omop_batch_root / b, tmp / b) + return tmp + + return inner_gen @pytest.fixture(scope="module") diff --git a/cli/tests/test_copy_omop.py b/cli/tests/test_copy_omop.py index ac3226913..3a5cd095e 100644 --- a/cli/tests/test_copy_omop.py +++ b/cli/tests/test_copy_omop.py @@ -20,14 +20,14 @@ from core.exports import ParquetExport -def test_new_project_copies(resources, export_dir): +def test_new_project_copies(omop_es_batch_generator, export_dir): """ Given a valid export directory and hasn't been exported before When copy to exports is run Then the public files should be copied and symlinked to the latest export directory """ # ARRANGE - input_dir = resources / "omop" + input_dir = omop_es_batch_generator() project_name = "Really great cool project" input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00") omop_files = ParquetExport(project_name, input_date, export_dir) @@ -51,7 +51,7 @@ def test_new_project_copies(resources, export_dir): assert expected_files == sorted([x.stem for x in symlinked_files]) -def test_second_export(resources, export_dir): +def test_second_export(omop_es_batch_generator, export_dir): """ Given one export already exists for the project When a second export with a different timestamp is run for the same project @@ -59,7 +59,7 @@ def test_second_export(resources, export_dir): and the symlinked dir should point to the most recently copied dir """ # ARRANGE - input_dir = resources / "omop" + input_dir = omop_es_batch_generator() project_name = "Really great cool project" first_export_datetime = datetime.datetime.fromisoformat("2020-06-10T18:00:00") diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index 432673995..0e64a3a17 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -15,23 +15,19 @@ from __future__ import annotations import datetime -from typing import TYPE_CHECKING from core.patient_queue.message import Message from pixl_cli._io import copy_parquet_return_logfile_fields, messages_from_parquet -if TYPE_CHECKING: - from pathlib import Path - -def test_messages_from_parquet(resources: Path) -> None: +def test_messages_from_parquet(omop_es_batch_generator) -> None: """ Given a valid OMOP ES extract with 4 procedures, two of which are x-rays. When the messages are generated from the directory and the output of logfile parsing Then two messages should be generated """ # Arrange - omop_parquet_dir = resources / "omop" + omop_parquet_dir = omop_es_batch_generator() project_name, omop_es_datetime = copy_parquet_return_logfile_fields(omop_parquet_dir) # Act messages = messages_from_parquet(omop_parquet_dir, project_name, omop_es_datetime) diff --git a/cli/tests/test_queue_start_and_stop_parquet.py b/cli/tests/test_queue_start_and_stop_parquet.py index 5c8503ea9..674ba9707 100644 --- a/cli/tests/test_queue_start_and_stop_parquet.py +++ b/cli/tests/test_queue_start_and_stop_parquet.py @@ -20,20 +20,20 @@ from pixl_cli.main import populate, queue_is_up, stop -def test_populate_queue_parquet(resources: Path, queue_name: str = "test_populate") -> None: +def test_populate_queue_parquet(omop_es_batch_generator, queue_name: str = "test_populate") -> None: """Checks that patient queue can be populated without error.""" - omop_parquet_dir = str(resources / "omop") + omop_parquet_dir = str(omop_es_batch_generator()) runner = CliRunner() result = runner.invoke(populate, args=[omop_parquet_dir, "--queues", queue_name]) assert result.exit_code == 0 -def test_down_queue_parquet(resources: Path, queue_name: str = "test_down") -> None: +def test_down_queue_parquet(omop_es_batch_generator, queue_name: str = "test_down") -> None: """ Checks that after the queue has been sent a stop signal, the queue has been emptied. """ - omop_parquet_dir = str(resources / "omop") + omop_parquet_dir = str(omop_es_batch_generator()) runner = CliRunner() _ = runner.invoke(populate, args=[omop_parquet_dir, "--queues", queue_name]) _ = runner.invoke(stop, args=["--queues", queue_name]) diff --git a/cli/tests/resources/omop/extract_summary.json b/test/resources/omop/batch_1/extract_summary.json similarity index 100% rename from cli/tests/resources/omop/extract_summary.json rename to test/resources/omop/batch_1/extract_summary.json diff --git a/cli/tests/resources/omop/private/PERSON_LINKS.parquet b/test/resources/omop/batch_1/private/PERSON_LINKS.parquet similarity index 100% rename from cli/tests/resources/omop/private/PERSON_LINKS.parquet rename to test/resources/omop/batch_1/private/PERSON_LINKS.parquet diff --git a/cli/tests/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet b/test/resources/omop/batch_1/private/PROCEDURE_OCCURRENCE_LINKS.parquet similarity index 100% rename from cli/tests/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet rename to test/resources/omop/batch_1/private/PROCEDURE_OCCURRENCE_LINKS.parquet diff --git a/cli/tests/resources/omop/public/PROCEDURE_OCCURRENCE.parquet b/test/resources/omop/batch_1/public/PROCEDURE_OCCURRENCE.parquet similarity index 100% rename from cli/tests/resources/omop/public/PROCEDURE_OCCURRENCE.parquet rename to test/resources/omop/batch_1/public/PROCEDURE_OCCURRENCE.parquet diff --git a/test/resources/omop/extract_summary.json b/test/resources/omop/extract_summary.json deleted file mode 100644 index ab66372fc..000000000 --- a/test/resources/omop/extract_summary.json +++ /dev/null @@ -1,94 +0,0 @@ -{ - "gitsha":"56e0eba8d098523c99f3c899979096d2c5ed4c5f", - "filesummaries":[ -"CARE_SITE.parquet: 4084 bytes", -"CARE_SITE_BAD.parquet: 3305 bytes", -"CARE_SITE_LINKS.parquet: 1201 bytes", -"CDM_SOURCE.parquet: 5823 bytes", -"CDM_SOURCE_BAD.parquet: 7852 bytes", -"CONDITION_OCCURRENCE.parquet: 6770 bytes", -"CONDITION_OCCURRENCE_BAD.parquet: 5004 bytes", -"CONDITION_OCCURRENCE_LINKS.parquet: 612 bytes", -"DEVICE_EXPOSURE.parquet: 4524 bytes", -"DEVICE_EXPOSURE_BAD.parquet: 4524 bytes", -"DEVICE_EXPOSURE_LINKS.parquet: 682 bytes", -"DRUG_EXPOSURE.parquet: 5782 bytes", -"DRUG_EXPOSURE_BAD.parquet: 3907 bytes", -"DRUG_EXPOSURE_LINKS.parquet: 597 bytes", -"FACT_RELATIONSHIP.parquet: 2167 bytes", -"FACT_RELATIONSHIP_BAD.parquet: 1357 bytes", -"LOCATION.parquet: 1865 bytes", -"LOCATION_BAD.parquet: 1343 bytes", -"LOCATION_LINKS.parquet: 904 bytes", -"MEASUREMENT.parquet: 6742 bytes", -"MEASUREMENT_BAD.parquet: 3982 bytes", -"MEASUREMENT_LINKS.parquet: 2309 bytes", -"OBSERVATION.parquet: 5614 bytes", -"OBSERVATION_BAD.parquet: 3618 bytes", -"OBSERVATION_LINKS.parquet: 1263 bytes", -"OBSERVATION_PERIOD.parquet: 2183 bytes", -"OBSERVATION_PERIOD_BAD.parquet: 1488 bytes", -"OBSERVATION_PERIOD_LINKS.parquet: 606 bytes", -"PERSON.parquet: 5420 bytes", -"PERSON_BAD.parquet: 3614 bytes", -"PERSON_LINKS.parquet: 1953 bytes", -"PROCEDURE_OCCURRENCE.parquet: 5230 bytes", -"PROCEDURE_OCCURRENCE_BAD.parquet: 3665 bytes", -"PROCEDURE_OCCURRENCE_LINKS.parquet: 1311 bytes", -"SPECIMEN.parquet: 4873 bytes", -"SPECIMEN_BAD.parquet: 3326 bytes", -"SPECIMEN_LINKS.parquet: 928 bytes", -"VISIT_DETAIL.parquet: 3228 bytes", -"VISIT_DETAIL_BAD.parquet: 3228 bytes", -"VISIT_DETAIL_LINKS.parquet: 435 bytes", -"VISIT_OCCURRENCE.parquet: 5259 bytes", -"VISIT_OCCURRENCE_BAD.parquet: 3429 bytes", -"VISIT_OCCURRENCE_LINKS.parquet: 1349 bytes" - ], - "datetime":"2023-12-07 14:08:58", - "user":"John Watts", - "settings":{ - "site":"UCLH", - "cdm_source_name":"Test Extract - UCLH OMOP CDM", - "cdm_source_abbreviation":"Test UCLH OMOP", - "project_logic":"mock_project_settings/project_logic.R", - "min_date": 20100101, - "max_date": 20241231, - "enabled_sources":"epic", - "output_format":"parquet", - "OMOP_version": 60, - "cohort":{ - "file":"settings/mock_project_settings/mock_cohort.csv", - "exclude_NDOO": true, - "exclude_confidential": true, - "min_age_at_encounter_start": 16, - "max_age_at_encounter_start": 80 - }, - "keep_source_vals": false, - "person":{ - "include_nhs_number": false, - "include_mrn": false, - "keep_day_of_birth": false, - "keep_month_of_birth": true, - "include_gp_as_primary_care_site": false - }, - "observation_period_strategy":"visit_span", - "local_timezone":"Europe/London", - "output_timezone":"GMT", - "condition_occurrence":{ - "include_sexual_health": false, - "allow_icd_as_std": true - }, - "measurements":{ - "include_file":null, - "include_measurement_concept_ids":null, - "non_generic_numeric_labs": 3040104 - }, - "location":{ - "keep_only_zip": true, - "replace_postcode_with_LSOA": true - }, - "mapping_effective_date": 19698, - "name":"mock_project_settings" - } -} \ No newline at end of file diff --git a/test/resources/omop/private/PERSON_LINKS.parquet b/test/resources/omop/private/PERSON_LINKS.parquet deleted file mode 100644 index bef07a1a24948de388872a05c7ea6a1cd68ebf1c..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1953 zcmcIly;B-d5Px@^ClHcR$9HqgTtT5X7RK-)h?#LF?~o4_6;b>k&Ny&ffC&OQ{0J#i zr?AST%s8o1rm#w#Dy2pKloTnldx)HfnH2hlxA%Vg_U-=O?jDz6*#H%&OqDbhDH$Wg zGtCh~c$f~Dl+bZ1Ps82_Oi_^q)XDpbk0d%IrW9={FgiLH1HynM+zhG04{wzUxIEQ<^k^A;)z;WKpW)QL!h|9 z^S#jHIhrXPRgL=5;;~sM){GUSZR309P=E2jxaX?R@j%8Yn(k(H;)4@{2ZEv5aAYp( zg7tiKE)t#%1p~tX!v3HiVE;`bWXg4ICp){x`>$~}tWeKYy4@MGdRQ>qt7hHe|C5t0 zIrWREejifqayO7Xq(2ROuq`mdPv}^$6Cxrghh?kcgs5jOdm3v6-B^FMX_zNf{W^_X zQeM4usuxZL&JK(>LdVlsb%^IwJmCLvC-4}RdVysDwX%Tw5xXFLrL#+IEbaU7ACUT(ksScfx00@lf13LJNR}Z5$XXu zTV*CmTEW}l@zG3w!a8~zYa)qW#k>Ciua)|cZ-9FLeR&V9_Asp>{TY%()hV5e z*~7Bg^_}OPPLY2*p5X{FjWM0uD_c4Z3|4U!V{8eZfV)pXIaFC5 zTKG(zqKu+I$1Ju5K@i4cam1RnZ!Hlr)_|#L`DC`8-(7pR8QczoB8Sv)mu;fZ_O7tT zDu~%@6qbKnFqTD;OTMk_1-CQ#+FDJo=bNxj=XcRm+qmg3*7dJx)=9W5TA}C+$OHIG zEjNohiMEjow2NQ?`@kDhwHW_DJ_&zeXZyGm&j$3vM75ag@`*)n@&TvX2Ie=(n9de4 zcqbUHAmox&pXCm)B#IbnQyScL?W9MvD&En0-+iIjNiKt{{{9RsR)E*;N$!kvKUu2 diff --git a/test/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet b/test/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet deleted file mode 100644 index efef741a4b9e97f37df5906f2566ece2256bb1bb..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1336 zcmcIk%~I1)6uwDAtQY|qZ%j0cx*)>>Fl{MOs#LWM1+9jH@j99OO#{P~Ni3mBk;v;Wvi()EsBET)I%XO- z+F>i;m2Sv)T0zhaL%tXAo!ttRbxr>$>7ONg+m8OLx;X`q36&Ai7#sji=o!#)1hkLP z@e`zT8W=Ab0udqhx5x>aYs-yRuT$fE-zzn`e1jhd99z0?zmM2&A{OMH+3%f1+XK!D zt5IkWPb6nkb9d*5;m+Lj)zQQVA*M5)sFcbWkJ9@o!!=6w%aW8P~iU{>pvy?$B_M9vCge})?8*LMGUnQ)OfQ* zRUmmnKKF_By?--Es0LYo;+h~C3kK0O(=rz?{Ehez581vbmi14HuGvxjt#X|Z(=s$x zC_F6Q>yvIA$#-z|sxD(J4}xohu^w})Y?-ktOIL->g@8@Gr5Rhp6ZF9e8oJWLpF^i8 zmCj>nP+YeljL&h$M#oEp=8M_7znwiS6&Jc&d*1$&rgwn(5VwSJuX2bFF8MWqZ(Sr0 zph6H{m;6pSSvf4Nc#X=kUn{SAjYkPDg`u56K3fu#K+(rI7tFVcbHWnM@B)R>qUnRa zB6i%|0JmPyT&VlSLOf`C&&r8xy|TT2;1^SM-&<(=f{z~Z19khPVUi#TYcO`77_oLK qe9`6m(?PpET?)fiFZPm$_6G;{YS6<1EY{^n& z$%=Bysieh}984)ahSGy6rSud+4keUOLP#N$(pyVta|)&O97_LxR;yhp>+}&ji*|PY zd3^u(zjhYQ^2iW%(m{GWOkI@p5+c8ABZS!DcEIg~y9Mr6xGA9$Jum?4dT1+c^~g>f zm7Ta=B8N>940Ikw2$#*_xN_Uh?%F)m?mj_Ud))F%g<@V!r^?0jLS9wN#bSC*O)q3p z)GoQCn>OiN8;i5Hr=hzCfHI+WWYhx>pys49^@M>In&28Rn*fXYsBET-l_X|QfDq>JnMHJ#@*o3w3Xw;k*w2QW1t&<}=0ESY-gp)hR|OnctECXbSz zADt!_TNUE{;4L(%i7n2^PF#cFhSb3J*G{&z&tThU_*l(@dkZC<__j^@+QzQ6vIi|d z+(2u>{%7LV5Fy`B5b_(`UyO?EXd;d)WGBucxFIzVzuLv#?=XmW8N^e`Qd;DMMtQ59 z{eW3v;54CHQhtx7^u!*#Niu8s&fAdAJ@Qr;JV#%P9{c#S7V=Gc`qu2{Fz$%n$LFXc zc(R4IJelxq`cAn`Ujrm8M#03!0=V~rhdOHo(A#KZDU;Le+_Xzuc6P(XZghHN3(y@@ zS=!x3U14e$Ip%Z?T^8Z#rINV{a8!0;i;grT;NI@ls=($V>{gEv9k;=}R4t?znsV!^ zP5Q*f?sv28ZU~ft*o1FM7|J;exsTcXVDqyo{I1Uko@BT{S6B}qD&6d3zwh5sX+v1RC^WSxEI2CTYgdHDkf3J{ z%O8^Ag#EdEDzlVH7p((wSzD$!O`OdtUaHcPBTwA9tT1 za_hQ!KAlTa59CI_eaG(wZPqxKHQDmy)25ovmVQ?_3q?Skpu_YU!B+@YbbnuX^F_}9 zmWj4~=a-*0Z^F>u3}zLk{L!Z^)mRlXp&dZ7URs*I{T3xas0$XFRGDMYPr@GEn5Siq z24Wq@8iwR@d)$xq{IhOPzQFeSLuv26XV`Q7ig+$3|85-SY?sbwwzp?0``^bT+l`Zt z?b2b!p3ecqcIQly9(GAC)+XKQ?jywQkzG6zITL$BJ7MJO9X`B{@F3@W4sN{O=Dfm{ zWj@aNG7l_^F-75g8hZiGL-+()e}Z%zNAj=@24UGu1(pR=*v`+2Wpf}0L#Sbik3qKr z+hGFaDs*@VmEcgVJrg|T^PK`|Y#detlprCxD01Z-Hl;;>q>#vEvubwa(&BKmI`5Az zrE=rzsTj@&#?Z8h*nu@LT~M{`uI&p#(4j ztHCa$7MR!Z_<6B^6U=zsO-*D==&!k12r9-5_~CxcIX;$}iSj_Gj;|Tn>^~8;hIp9p z0p%p-3qtbg;E`JX1+@G_B{hB6QxICIsJW4qL@Y9-<{Z{7_XR-zVz{UtAS*F5n7#k&YsY zhMzu7WN#r^yiiV;j;e*i(PXihuXs1$FS1O2%{w%FZ1C8i`w@I?{>vYE+)2nR{1^E* DQ;{8I From 0a39b2f434bd7e04cdef0f1f4307a0d6ffb4b526 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Wed, 31 Jan 2024 19:34:50 +0000 Subject: [PATCH 02/19] Temporarily point to correct single batch directory. Will change this back to multi-batch when it's implemented. --- test/run-system-test.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/run-system-test.sh b/test/run-system-test.sh index 7a65e963f..f0efeb87a 100755 --- a/test/run-system-test.sh +++ b/test/run-system-test.sh @@ -28,14 +28,14 @@ docker compose --env-file .env.test -p system-test up --wait -d --build --remove ./scripts/insert_test_data.sh pip install -e "${PACKAGE_DIR}/pixl_core" && pip install -e "${PACKAGE_DIR}/cli" -pixl populate "${PACKAGE_DIR}/test/resources/omop" +pixl populate "${PACKAGE_DIR}/test/resources/omop/batch_1" pixl start sleep 65 # need to wait until the DICOM image is "stable" = 60s ./scripts/check_entry_in_pixl_anon.sh ./scripts/check_entry_in_orthanc_anon.py ./scripts/check_max_storage_in_orthanc_raw.sh -pixl extract-radiology-reports "${PACKAGE_DIR}/test/resources/omop" +pixl extract-radiology-reports "${PACKAGE_DIR}/test/resources/omop/batch_1" ./scripts/check_radiology_parquet.py \ ../exports/test-extract-uclh-omop-cdm/latest/radiology/radiology.parquet From 7c628c7b282d8504bf35e734c19d0a239e0e0f80 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 1 Feb 2024 12:35:02 +0000 Subject: [PATCH 03/19] Split procedure occurrence test data between two batches. Because one person has procedures in both batches, there is some overlap in the PERSON_LINKS.parquet file --- .../omop/batch_1/private/PERSON_LINKS.parquet | Bin 1953 -> 4264 bytes .../PROCEDURE_OCCURRENCE_LINKS.parquet | Bin 3100 -> 3077 bytes .../public/PROCEDURE_OCCURRENCE.parquet | Bin 5264 -> 10585 bytes .../omop/batch_2/extract_summary.json | 94 ++++++++++++++++++ .../omop/batch_2/private/PERSON_LINKS.parquet | Bin 0 -> 4252 bytes .../PROCEDURE_OCCURRENCE_LINKS.parquet | Bin 0 -> 3068 bytes .../public/PROCEDURE_OCCURRENCE.parquet | Bin 0 -> 10545 bytes 7 files changed, 94 insertions(+) create mode 100644 test/resources/omop/batch_2/extract_summary.json create mode 100644 test/resources/omop/batch_2/private/PERSON_LINKS.parquet create mode 100644 test/resources/omop/batch_2/private/PROCEDURE_OCCURRENCE_LINKS.parquet create mode 100644 test/resources/omop/batch_2/public/PROCEDURE_OCCURRENCE.parquet diff --git a/test/resources/omop/batch_1/private/PERSON_LINKS.parquet b/test/resources/omop/batch_1/private/PERSON_LINKS.parquet index bef07a1a24948de388872a05c7ea6a1cd68ebf1c..4381f36a5066112c3e26a264905460b5f174852d 100644 GIT binary patch literal 4264 zcmcgwUvJu06gQMol66yB)d(aWBDA8a>L2U?4b>iE2oO_30x34Qtup@$7;MA91e~hU zp7#m%*r%!5+oU~B`v7~I_96PThkbxOOgs1T&wx{HS~@GS?>*<-bAI=C&ONuyF;k^T zAGt-|DZz)hO%Q>IhadZ|t2X zLVCkV!oL*`ZS>UsplhCL3h52{g5P?AZ#}WySl~n6!YC#|dJ%#x_yCj@h%Z3)63N#f zB5BBn^a$c&1_CF#=v&BbRw|1q^JL01heuMCxbc?M2>GJ`mmr;rtRr?0q0YZcgLhI@U#5 zJvuv4`{$ZGr|x-Bh`kHMz6nHNt!un(^79!%TVH^Cc<$}t4{?8Qm6s4=5T5&cv(ddh z$byR$jGS5i>5~Q2O-51r>R>QS?=POUStG$6YLxryi1fKumN%t#OuUPo$e)$njTkK<=j@Z@L?-i6} z?0+eh;Me}3FBS-X=f6Y1xd{1tqN#|3&<{6WM>SJX$ER{v9~$N$`gHrn>u3jLFpBm? zv#CZOY)4JeP(ff0bsc5{OYB=PjpK2*2SXYURWzjDyn@NZ-zj#kPTekhInz{KIbD`b zV4Yfa4~2^kO-=4969hw}XU`Q~sjbQuLT0FzsEC#b=C7lw3FeyFL~Co$^AC5UZyaEj zQAFqW19GzjW?8j@Ni(e-8gsIQXfAdDLUg@(0Jx@`jIQw_Rc{=~jaN zyIibqeYIb^gm*JpL2;&cMJtt9=ij0DPzDygcs>5I?g|3qbI?~stJ}xrscTXn?o3pj zJzMPeyCVeR9R8i6bxkCt5|84Ks3_D-bQ<2Ne3V2wB6O3fR*(3nleQmv``KgkP)wd- zmD|$0>Z>-3ZLwO|Bi5iV*^HNq;l!FTF+p= ztdZ74gKH~Utha=EwFN%njT69oFwg`(*_P;PPcmfekK?L^((|Tvwt`)@!9H&Re=}Ub zLxHnpI*s<#nQ*m=-NnYt0Jp#&<;18DmHk6|$~Ps|GfC$jAusX63sys1N<0^D@YR-5 z$jx(db~0*0{^ne=dIt2mE}7MwQZKd|##le0x#VRSYru!db3-LprDP+oNd>MX)H^ty z?85mH9+R`=1N}@n&_|Gi6y(1rnU#w}&4uqml9CGJbSaDKaKU5k0M_t%Tdh&HG+Ac= zo`H~0JNIOZJOy>9mt`YyDYpMq50~L9j9U$wI~Me^E}H_>+kzj#0GSRU9sbGHZdx-a~i65c5#kqprgxobVjg|FXWbasCMe~|_J)!;&eY=6alc$-j&HQEXO5D_){xF`U^8;%k9^4b? zQrOSU)-x^avV-31cwEmvSY~&p>lx+*udgtf3X{dW%IJ2P1;a|F!s45#I5nLJ!3&Ze zWZ>+;ouj9F0#-E9df2aw{7s)6-Z>H4?uq@chN|^Y?)4txi+6i^v2IVU-0(~I50LYR Nc=ahk9K+uz{sv{4Qs4jp literal 1953 zcmcIly;B-d5Px@^ClHcR$9HqgTtT5X7RK-)h?#LF?~o4_6;b>k&Ny&ffC&OQ{0J#i zr?AST%s8o1rm#w#Dy2pKloTnldx)HfnH2hlxA%Vg_U-=O?jDz6*#H%&OqDbhDH$Wg zGtCh~c$f~Dl+bZ1Ps82_Oi_^q)XDpbk0d%IrW9={FgiLH1HynM+zhG04{wzUxIEQ<^k^A;)z;WKpW)QL!h|9 z^S#jHIhrXPRgL=5;;~sM){GUSZR309P=E2jxaX?R@j%8Yn(k(H;)4@{2ZEv5aAYp( zg7tiKE)t#%1p~tX!v3HiVE;`bWXg4ICp){x`>$~}tWeKYy4@MGdRQ>qt7hHe|C5t0 zIrWREejifqayO7Xq(2ROuq`mdPv}^$6Cxrghh?kcgs5jOdm3v6-B^FMX_zNf{W^_X zQeM4usuxZL&JK(>LdVlsb%^IwJmCLvC-4}RdVysDwX%Tw5xXFLrL#+IEbaU7ACUT(ksScfx00@lf13LJNR}Z5$XXu zTV*CmTEW}l@zG3w!a8~zYa)qW#k>Ciua)|cZ-9FLeR&V9_Asp>{TY%()hV5e z*~7Bg^_}OPPLY2*p5X{FjWM0uD_c4Z3|4U!V{8eZfV)pXIaFC5 zTKG(zqKu+I$1Ju5K@i4cam1RnZ!Hlr)_|#L`DC`8-(7pR8QczoB8Sv)mu;fZ_O7tT zDu~%@6qbKnFqTD;OTMk_1-CQ#+FDJo=bNxj=XcRm+qmg3*7dJx)=9W5TA}C+$OHIG zEjNohiMEjow2NQ?`@kDhwHW_DJ_&zeXZyGm&j$3vM75ag@`*)n@&TvX2Ie=(n9de4 zcqbUHAmox&pXCm)B#IbnQyScL?W9MvD&En0-+iIjNiKt{{{9RsR)E*;N$!kvKUu2 diff --git a/test/resources/omop/batch_1/private/PROCEDURE_OCCURRENCE_LINKS.parquet b/test/resources/omop/batch_1/private/PROCEDURE_OCCURRENCE_LINKS.parquet index f4a1d63704efcb73a4b56b9aac9a1b94c02c62ff..9dfa295a29dd4268e28a0b5e08554788afd981fa 100644 GIT binary patch delta 447 zcmbOu(JB!i;22~m$|9;Ds^TNcA<7`cz#w45$iTqB1jNih%mT!s45A{U5;{P60Z}$l zHW>*Pkf;O;NSzpi1`-b_z@-Ax%E8FQ{DeVm*W^SdHCYa^Rg7Yr7}Y$~I5722Tx~1? zGuVs+sFhs;Yybm;+O>(_6k<8VLd2rfW-+V%VUhvLOX>h!%OJ`Ga+Cm&0=kYxR7O<6 zN0bTddOi~_pqQhhp^>qPshNQxP??aZ7{cvlY#Lk$*~yKJVr)RSGN^S-p35jF!y&eW zQEUyP+6-2;J1jD2YVDcjCx2s1Vds#Pk+dPsYcfn=Pw=67V)6$j5k{gcRNz>|EXJY6 aA+}?3I&%c0$>syh^~{Wcn;qEiumAvs7e_|` delta 502 zcmZpbm?M!9;22~m$|7nYYT_fxBg!Dez#tLB$iTqB1jNih%mT!$K+Fcjq70&9qEb3Q zbpoPnqHHn}EFe({7La-|1`Q-0P=H$lWCRZ*Gjqo=CbcsYG@ltJ*sj8C2bq4=^^f^GM1_+7Rb+Sw@im olnMHeIIHD3#V#<5@u+c#J)0cB9KmR|c_ni_Gh^gtP4+u10A{65eEQ4E#;QA5GEM=->?) zzeLZ_Goj!#_6Da>EG-VO|^wPNNJew^{(!VcxH1EVS1w^9oI{Y)dnN>$ZJ!hWYe+ z0Nm2%((MU6NQYLz=V;yYHwHP1`dfyg{sVuboWRlGG>)TUtK|UwHpTq=tOfdl1zO}Z ziBRD^AFzS>+|T?O$zbVmY4wEr61{BFcnEmp=&6(+172IJcxV6n$Ht`2t_~m@gk=e*0Jm z^c$ESxRU2K{q*@&dWv{u`h4_t;tdzyu{Su4PqdrW0_a1&`zhv|iV@5(i9sz4XN$zia*lKn>4t>(uJMw?VV<~4F^(>le>DvBF{d$&iv-OC(lmK99k_bi^GMT&TnEQf z=T~Le0X+?C*wO~342)v=t;?YoFfjDV;}e}eZqs1q0{JhddK+WRL60T8LYJb?0lZ-# z=}j-f>nLms*)6XlL1Rwhimt-jcSJ+LKO368f9eI=sT5SqDc3!xSE4-HO5u^5?%(yu zW=`l7W={9yWFGo3WKQCIV@}}EVjj&w!+iJCBk<0JfS>UPzPfOYf+{9B)8bT->jeLM z`F6OWiqZiTL~>VAJK^W&Ub!9Khdy`h;gg>TG-RZfwhL+#2kv^pBs)}R?7@68sx zE|OOXU!!+kg@&$gn<$qIwN5Qihsol$vrmV$jny(}3U}3p&=hq9U8OY`4m9~aNzf3P zB553)N*sva4#Nc)3Ou!rbGJ6rH=hgNv4L@-2wR1L*0t8ZS4JyfnzSxpI6zx7V!a)ch{5h;pvcujiEbE??e4 znXXtq=&YvLhOkiF71RyrWBW$CC!}&qx#f*cE|otNGo?Q2S*t`v9IwU_9kCqkR$}_v zwaReKZnYehQI=Or9X^HY)mpXk!G0y?v2G>}*wcrVg<@RDASSjUmg6nZzhAqR!?gip zv@ews5vh_SdUHym!6~IZF-69;TBW!PIwI9ufH&J|)XMQaK2~h;ia_+0O2y3TX1%fF z!Y&LzKCOZMOk7}xY`#~GmG;F-UarIvd)0-~Ku}6u=-c7S8x7ohO_60jQ{3T|v<4WL z0ApWJ5_`3BF$%Ife5|ks^9P8dR!X#LI?jF-d{M0wcEqgQH|Di?b>ncYF>ar7S6p5j z=st_eVkQmqi@QVHW+NTkuN_CNl0W2Q@rhX7GJ=N>fH#q~!yMT!_)!zG@;Ka1`)A@& zK68**O(8GG*%f%I6l5V=oQTtIQ{V&opf9E$tv&4+kYffX6|P+B)Y5>-;CfC?4#4kn zEn6CZ&(TkMtf@(eJBY2QoxjCh5x9h$G zwZ)8#+{E}f5>}hM7f;E4h)kupWaN-2UrEO8aZBt|fw(Jg#R_SyD?}su9-Vu5Hm9OY zX$g3$$Me2eD5dg=2keW)Kgn^&zpd zmp)(*GH_w%7(8;zd-+BNj1lQ!a^;a?wbTWkTmC;9r!3q$9|#Y;@{GlqQbZb?2a*4! z2nVrN*emk9S`fW*BE$rdk(5N&$gkPW(gx)Hkvys4`G9#`kUe-Ya$Z54mJ|D28P2gv ze&_#`iw?wcx(hLA`r0F|(@{8MB3z{yAJ6YvhmWNrg*_v`jOKQQ&m?xmB2P{n>FR2SFLQs3SM==QMkTP z=)t*-XLhVwK8PMMe;)EsBVc_XDru;R^t|rLb*N*+?EY!x{UHtyetK}lab8WD>y>j( zx^A(@;}DD8o*p&V1LvHI4@!_rQu#fo^B^ZB23U(h-sy1V{GeLN$FU8yKpfr&{DMGy zEh*4rou7ZMKEE?R57J3?7iY2^e31JJ zzOXEh4@aGC>2jRWY;rRJ`cOOxyMy)3_6lqgKKi9f-vzdlReU&VaUqs~njCV`LTwlB zMlxD%zBfOwt}ht-T_c;si9Bvyb;g55HrC-ni#F&REW!5`sMFX~&eYEqQ4{OdPf(20k6O^qWPfYR$QSHN>Hfq; zjP>v>iKcM+)2o(4QElDu2fKoi!6r@o8%^ThRqV9!nMh?FInb(cUxr7=bNEvPc_p4$UNhQeJi7Tws4rwNM6G;{YS6<1EY{^n& z$%=Bysieh}984)ahSGy6rSud+4keUOLP#N$(pyVta|)&O97_LxR;yhp>+}&ji*|PY zd3^u(zjhYQ^2iW%(m{GWOkI@p5+c8ABZS!DcEIg~y9Mr6xGA9$Jum?4dT1+c^~g>f zm7Ta=B8N>940Ikw2$#*_xN_Uh?%F)m?mj_Ud))F%g<@V!r^?0jLS9wN#bSC*O)q3p z)GoQCn>OiN8;i5Hr=hzCfHI+WWYhx>pys49^@M>In&28Rn*fXYsBET-l_X|QfDq>JnMHJ#@*o3w3Xw;k*w2QW1t&<}=0ESY-gp)hR|OnctECXbSz zADt!_TNUE{;4L(%i7n2^PF#cFhSb3J*G{&z&tThU_*l(@dkZC<__j^@+QzQ6vIi|d z+(2u>{%7LV5Fy`B5b_(`UyO?EXd;d)WGBucxFIzVzuLv#?=XmW8N^e`Qd;DMMtQ59 z{eW3v;54CHQhtx7^u!*#Niu8s&fAdAJ@Qr;JV#%P9{c#S7V=Gc`qu2{Fz$%n$LFXc zc(R4IJelxq`cAn`Ujrm8M#03!0=V~rhdOHo(A#KZDU;Le+_Xzuc6P(XZghHN3(y@@ zS=!x3U14e$Ip%Z?T^8Z#rINV{a8!0;i;grT;NI@ls=($V>{gEv9k;=}R4t?znsV!^ zP5Q*f?sv28ZU~ft*o1FM7|J;exsTcXVDqyo{I1Uko@BT{S6B}qD&6d3zwh5sX+v1RC^WSxEI2CTYgdHDkf3J{ z%O8^Ag#EdEDzlVH7p((wSzD$!O`OdtUaHcPBTwA9tT1 za_hQ!KAlTa59CI_eaG(wZPqxKHQDmy)25ovmVQ?_3q?Skpu_YU!B+@YbbnuX^F_}9 zmWj4~=a-*0Z^F>u3}zLk{L!Z^)mRlXp&dZ7URs*I{T3xas0$XFRGDMYPr@GEn5Siq z24Wq@8iwR@d)$xq{IhOPzQFeSLuv26XV`Q7ig+$3|85-SY?sbwwzp?0``^bT+l`Zt z?b2b!p3ecqcIQly9(GAC)+XKQ?jywQkzG6zITL$BJ7MJO9X`B{@F3@W4sN{O=Dfm{ zWj@aNG7l_^F-75g8hZiGL-+()e}Z%zNAj=@24UGu1(pR=*v`+2Wpf}0L#Sbik3qKr z+hGFaDs*@VmEcgVJrg|T^PK`|Y#detlprCxD01Z-Hl;;>q>#vEvubwa(&BKmI`5Az zrE=rzsTj@&#?Z8h*nu@LT~M{`uI&p#(4j ztHCa$7MR!Z_<6B^6U=zsO-*D==&!k12r9-5_~CxcIX;$}iSj_Gj;|Tn>^~8;hIp9p z0p%p-3qtbg;E`JX1+@G_B{hB6QxICIsJW4qL@Y9-<{Z{7_XR-zVz{UtAS*F5n7#k&YsY zhMzu7WN#r^yiiV;j;e*i(PXihuXs1$FS1O2%{w%FZ1C8i`w@I?{>vYE+)2nR{1^E* DQ;{8I diff --git a/test/resources/omop/batch_2/extract_summary.json b/test/resources/omop/batch_2/extract_summary.json new file mode 100644 index 000000000..ab66372fc --- /dev/null +++ b/test/resources/omop/batch_2/extract_summary.json @@ -0,0 +1,94 @@ +{ + "gitsha":"56e0eba8d098523c99f3c899979096d2c5ed4c5f", + "filesummaries":[ +"CARE_SITE.parquet: 4084 bytes", +"CARE_SITE_BAD.parquet: 3305 bytes", +"CARE_SITE_LINKS.parquet: 1201 bytes", +"CDM_SOURCE.parquet: 5823 bytes", +"CDM_SOURCE_BAD.parquet: 7852 bytes", +"CONDITION_OCCURRENCE.parquet: 6770 bytes", +"CONDITION_OCCURRENCE_BAD.parquet: 5004 bytes", +"CONDITION_OCCURRENCE_LINKS.parquet: 612 bytes", +"DEVICE_EXPOSURE.parquet: 4524 bytes", +"DEVICE_EXPOSURE_BAD.parquet: 4524 bytes", +"DEVICE_EXPOSURE_LINKS.parquet: 682 bytes", +"DRUG_EXPOSURE.parquet: 5782 bytes", +"DRUG_EXPOSURE_BAD.parquet: 3907 bytes", +"DRUG_EXPOSURE_LINKS.parquet: 597 bytes", +"FACT_RELATIONSHIP.parquet: 2167 bytes", +"FACT_RELATIONSHIP_BAD.parquet: 1357 bytes", +"LOCATION.parquet: 1865 bytes", +"LOCATION_BAD.parquet: 1343 bytes", +"LOCATION_LINKS.parquet: 904 bytes", +"MEASUREMENT.parquet: 6742 bytes", +"MEASUREMENT_BAD.parquet: 3982 bytes", +"MEASUREMENT_LINKS.parquet: 2309 bytes", +"OBSERVATION.parquet: 5614 bytes", +"OBSERVATION_BAD.parquet: 3618 bytes", +"OBSERVATION_LINKS.parquet: 1263 bytes", +"OBSERVATION_PERIOD.parquet: 2183 bytes", +"OBSERVATION_PERIOD_BAD.parquet: 1488 bytes", +"OBSERVATION_PERIOD_LINKS.parquet: 606 bytes", +"PERSON.parquet: 5420 bytes", +"PERSON_BAD.parquet: 3614 bytes", +"PERSON_LINKS.parquet: 1953 bytes", +"PROCEDURE_OCCURRENCE.parquet: 5230 bytes", +"PROCEDURE_OCCURRENCE_BAD.parquet: 3665 bytes", +"PROCEDURE_OCCURRENCE_LINKS.parquet: 1311 bytes", +"SPECIMEN.parquet: 4873 bytes", +"SPECIMEN_BAD.parquet: 3326 bytes", +"SPECIMEN_LINKS.parquet: 928 bytes", +"VISIT_DETAIL.parquet: 3228 bytes", +"VISIT_DETAIL_BAD.parquet: 3228 bytes", +"VISIT_DETAIL_LINKS.parquet: 435 bytes", +"VISIT_OCCURRENCE.parquet: 5259 bytes", +"VISIT_OCCURRENCE_BAD.parquet: 3429 bytes", +"VISIT_OCCURRENCE_LINKS.parquet: 1349 bytes" + ], + "datetime":"2023-12-07 14:08:58", + "user":"John Watts", + "settings":{ + "site":"UCLH", + "cdm_source_name":"Test Extract - UCLH OMOP CDM", + "cdm_source_abbreviation":"Test UCLH OMOP", + "project_logic":"mock_project_settings/project_logic.R", + "min_date": 20100101, + "max_date": 20241231, + "enabled_sources":"epic", + "output_format":"parquet", + "OMOP_version": 60, + "cohort":{ + "file":"settings/mock_project_settings/mock_cohort.csv", + "exclude_NDOO": true, + "exclude_confidential": true, + "min_age_at_encounter_start": 16, + "max_age_at_encounter_start": 80 + }, + "keep_source_vals": false, + "person":{ + "include_nhs_number": false, + "include_mrn": false, + "keep_day_of_birth": false, + "keep_month_of_birth": true, + "include_gp_as_primary_care_site": false + }, + "observation_period_strategy":"visit_span", + "local_timezone":"Europe/London", + "output_timezone":"GMT", + "condition_occurrence":{ + "include_sexual_health": false, + "allow_icd_as_std": true + }, + "measurements":{ + "include_file":null, + "include_measurement_concept_ids":null, + "non_generic_numeric_labs": 3040104 + }, + "location":{ + "keep_only_zip": true, + "replace_postcode_with_LSOA": true + }, + "mapping_effective_date": 19698, + "name":"mock_project_settings" + } +} \ No newline at end of file diff --git a/test/resources/omop/batch_2/private/PERSON_LINKS.parquet b/test/resources/omop/batch_2/private/PERSON_LINKS.parquet new file mode 100644 index 0000000000000000000000000000000000000000..33492439bf223e565e5a1ddc429ce65934542e44 GIT binary patch literal 4252 zcmcgwOK;*<6gGs(tJBfUXpD%&B0?H8s^)?31qOrO%?eSFxZ9~6L3be z>o2If=yFtbG0S#0%l?A?gsO`!tFF7MdhRt317@VEnOecz^ZxER=boG7l&(@_h`dWa zD8UzTpCC4)L4qLG;6oDReKPzQi`U8ZXe5NVkr0xHiANp^7+brD9SQ{2g7J5OC>h*I z6YF<(B3nJx>UYfxO(BEfQ24t*_-!D55a0Zew;dFdAcF|OUHAeh3lPtNcY*jz5D_;L zLUIIgMghYM@z1px#CxZrt*NGcIIu)XS6`}*2kPI!_%A^KJM*;Q*B5MpOil6`Ip2cV znSC|45+5jS*NG5<>1iw!_+M4P`HpW^Dz<*nhlDEgX-9EK6L$sV@%fyI(420hY z;(y(Wzq=J(;mj8;q`CxPQ1wwp?~wOrMc)qrb0dO_OAzs2H>2eCIQ+d+U6j?6%QMxw z(&TC0UWJAD_nYyrH=|(d8xMC|aNKR+9-hxWU(7~ib0V3o!of#AT+nk%yA}upiTDqn z%ph*liejn#{-mbw1L2x@n$E|QE zz8U^z{Q&{TA+p{RO-1ZSe%g2)(@aGjUC3R1V3_^bi`_3?$2!1+Rm>92rW$*^8#6^i z1%^4$b)fpTXxTtgc-`&6at|+6w4~mA354tK0y|g7Zs)yRYO1bW%=27W7q-(w;bH?* zle>zGU|{r|sh}&hRoOwIHrBV z`>8CTxYWC%y`Ngnzen()3d~0Fa{PJO4FtyLz*0rKYvK0PHOYdT5_M7Bty|zZP;XubW7rENN1q%qkbt%Yw}*TC7TuC z(W5baC1=@v_VBpRW{cNKfg2(}$8}0U@-t_bEf)|bT2uIR5Bzt8GZw`LjMPx&G6}WLV85)9(L{r5D_JbJ zg?hCGJ`#;Hz|4yX=5|-iG{5a6ujloFgYQXkQ&S zSBuzvbW8?v3;CnG81;d2c-z+($A7ydKJbRg$@v@}le6U` z{Zcv7hfsrks6kIME7!-GPreIjN-B&pr7Y^hSsrUgV8iDfwMIFTYi9tSzL3v&_vDB? z1%0QNWh1o`+kfhZRrm^{RwK!s3VK#lad|Rkxj9QX& z6hnu#t*dT-4jXXB8uhX_Uxrl3w3Nd}VpaVGV|3N1SCVDF9$taD!dE-8!ClLVY3${k zcJdwJsHn?(NU6Z%pzpD4F0t9z77p!3-VOwgMd2c3E7*}ljZokqQwcIOV0 zhP5N@$=Hu#p!20)K~F$mv7>S}H7vDr8tQj;c8-!ly?8D1nF03E&JD_EsS#@?(9|oS z*QAGGiDoYS482~WdC8>E@&vuyA=*Ww83=|RVWy1Ucz$9M&1ul|<`!nJa|K4oU6W}n z*mse>$G(c_ntwl``44lufxg|RnXOIyG1w9hb+12+XYTxfEyRO+0$mD+xrsg9!Y(`L zy^P2A{DWrpU~Es*XV|_%GZi|E^D3j;WfBYv=?aT)qT*Q9Wr7#PJ!0VOz@4LKdIAz;ef5SUjKDY3x!yzV*QJ?Gpbzd}8Rj?s7N`z=_g zdlYpmy-QJ49R4&#-=he%daPO@)xk1CWFiks=NR-O`x0|MKuYi4y zR8{vpV`g^(OVM2d>$gba&q(ga-Q3@?^bKCG0-S1*iJx~8vD~f1?>qM?=$mBRk!?-(lK^1itJKchIli>xthV2(vlCLeaYw-qHoXR(k z0P`vV*-IDxGLS1xEcZ$=j&cdnr5%r%UpO&%Vab zVswBI>cSsCFAyrC>Il^l8ll>V$g0&X{TvHn&q`lK`ymf+;)UorgysnL!HS*i&+q~~ zLLILL)Hj@E22z7BifrdpIplrCZW}L+s-iEOt}K+M++;T5?B-apG?O#tBhD&~72z1$ z1e&n$S~X;-h2B`TyC6gP6@RMMxH7jt@VHv@Ol$CS?58a<8rBckil+%oAhKa>NnFPa z1_EPZTgB!*rH124jwCFmB6~&LMjf!%&%{Enq&6@nWN1Rk0sm8}&*9jBk(uj!C9jKB z;+L~3hHUW@tw!{|B=*MOBR}W^-b2rjgwjM|dyZnM#2?T18m*&|F}#3XU4nhxhy0!6 zf;=2HPX{bN)x@SLvX#l8z%Nyc4?s4Qg|30mE?K6jGfFCRhdXaj{)1E z!geOGe~EEOe8rJMobv(HVjy;g+MzjL?Q;~nd)79tJEs|F`|V{|Gse^!b=cnv^TX;X4EUvTz1)pHzEF`U`RZbn{eOk~*@I*gTVk zN-|tf94(8MVRp+RUU65dyz$0r+R&oe3zX*#&yu9PyfphE+ y;?1%D)zd#G5TcS9M|QstbpNsHIFHF-&#VS_CLHOh=kU=M{;03u`}Gxk_x}&1R8-Ue literal 0 HcmV?d00001 diff --git a/test/resources/omop/batch_2/public/PROCEDURE_OCCURRENCE.parquet b/test/resources/omop/batch_2/public/PROCEDURE_OCCURRENCE.parquet new file mode 100644 index 0000000000000000000000000000000000000000..b66ca7f39227e101cdff5f8edea71fdc515374d9 GIT binary patch literal 10545 zcmc&)OKcn06&=ZpD!Z;@%O*`FEFuuiphcaCq(s>a60}1~{8geQS{i?BOcM1DP0Ap`Bfa3_*Zb63=@Q(sE;L9zk z-DzrWh(EnVe?AQ$Z3!NECxcx0;diK`nY0f`>!(y42?uZth1x9--Y1uByw_~JBBx6Z<9{?uKSJ}{ zVmv~9kuC(kVe$3)>(mGT`T1{tbt6Rm`u%^s8Ts_)2n}Yf8R1!L4s7Dn>p8&(S0llI z$>TZq33ORCPVxJx&>yGh$6uvCyAlEMCX^F5@;%jT@alSSns{O0YW(|ty2svd0H3HP zyX8b4py>CXw{<^nBi9ElY1c!TA5VcB(jQ)<#1o#2|JH-i8qd4DyGdt$>?qQ&zk7SEM$WL!lX-ian?1QB z>pmF(cl>9V{xtklJAzVdjy-zOR|LH2nx~pm1#D)VzTNl{twkh6sP#5ecEdd}RKM3$MWF zi>R3B3$AeJ3#&-zXId#dlkEOwQ#O50S@?o8C;imPkUo$6MxR5{qMuEop}&0g2z~Um zP=KBdJ-&8}f+{9F)8aId>x4hM@h~cDqI3iWkCM&X7F1)kQxwL9C{SMEjMae#57h}wmL z>w1hrUC<^?^x%~>eJMforAl!B$lNdT-lJ|J*NKSlv=D3<7zuBsW50o>j zh?Ea)iPM{HlGPQNZ*%PdCiT{UYqy(y)J_kQqTJN5H@^I8?A3S_Vy2ZHorA9?af}LI zWybNG|!BKb5n7>3z`Km2@;VV0S6gUK>XgUK{VUpnm??_=lMa6H2ljW#@q z6f$05m_7sR;2VXkE@$8gZHBqufWGwIf~s`uId-|4DE0U}tMl51d|yuU(xBMpD#^pb zL9Gy-t%A=1~5OXgZ?aBpoe_1S4*&mVzsDL6RCsR5<3u7whMiG zTxCnfv)5Hc;dA9ZUd`%&aRo5;1vPb0uax63caKk$4q*KNan#vVt8U=z*T5FFYH3f* zD}8fai`TY}H{@~WRJ!8o#=x*yToH3wSYO^B>NX$i;CY=m>eb>gpGZ!`>YXF_*Z_DF z+c++e^MW09A+LSL%6o05(TE8NQ~a!SBGg;!ggS_eJ13a>+-xy_(umYD)&! z_zD}VRdD{=&|mi+^c&#Y6ZaN#3UU+u=Tul7^Zj^A&O_!@%PVFKiSyNT(&@KEKUMI% z%1)w`)w@DGW}MM|hPwp~=ddflQ^TJR#S)t-rcO8)@qZHI&d43&Z)^%;0{CM!9fO#4 zNY3DE8>i0!mpx~6`GxyjB$l3V1_ijVcLpAL=lj`40gN%}X=3H6e3k72&u#mkj#D;n zgAarUesRX;3>%Zi#zEvi8{@#&N(W`0*Gi&aOaz}Gb0js@HREf3o85xAKN2T(%numH z1;vLKBl!yaw30gHDv-yj#l8PiEIJY^*)I5?WosY5&c-3f#JFlXIiBD34xh-zN(W|q z8IA2KpG)nFtF@S)j!8}*%{vLMT53UThTK_-3tAEEUxNH;`k9YDMO8TloM*|E9$cfo zSyzvGwdz))W}zlB;<_)_p^g#rhZhz1 zhd6xr>BALQzM8bwFXo(g-Qx7eAr_}TeQK@~*IbAVScoN=;sMlo5R*~^ti>Sibht`! zP^%V`*oIml3GV}bNg%csRj45oN{^SZRxBKqGRyt-gEbj*1MZ8vQ;58u~!@EQY z+t7fOEg`u)oz_gG0i{xKtUZz;?QZ z4_7TNBvMe5Lo8aV@59|lLCr1p78kY6CG)%+Pz-~}4jNAj5?cQBu zePeZJc-|u-M8K!xyac*XW%qfgd%*w6q`l#KW)1O>C5FDp1jqVqTrf8OFsprYe};L0 zGt-PEFJs7S8ol@=p9(%*`cHUrJZNNV6E3uBwQ`f$a1A|`zge~KWl#1g9;X4~>dFFy(Og&fAH6yE0v|2mfbOnm}~ z9TRUJNdX_zi1=#Sk7&keKitZVcjRtN^xH>0gIsc66lnN6^Tr!gc%j9$-|kBKD?+RF g3i-ii!TdpH!T70W{)-4jo$?>`?*)q5g#Rr0Khe=i7XSbN literal 0 HcmV?d00001 From e2d091720dcd9a1581d0b38083a04f71c54aff52 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 1 Feb 2024 16:15:04 +0000 Subject: [PATCH 04/19] Dodgy batch that doesn't match the project name + timestamp of the others. --- .../omop/batch_3/extract_summary.json | 94 ++++++++++++++++++ .../omop/batch_3/private/PERSON_LINKS.parquet | Bin 0 -> 4252 bytes .../PROCEDURE_OCCURRENCE_LINKS.parquet | Bin 0 -> 3061 bytes .../public/PROCEDURE_OCCURRENCE.parquet | Bin 0 -> 10545 bytes 4 files changed, 94 insertions(+) create mode 100644 test/resources/omop/batch_3/extract_summary.json create mode 100644 test/resources/omop/batch_3/private/PERSON_LINKS.parquet create mode 100644 test/resources/omop/batch_3/private/PROCEDURE_OCCURRENCE_LINKS.parquet create mode 100644 test/resources/omop/batch_3/public/PROCEDURE_OCCURRENCE.parquet diff --git a/test/resources/omop/batch_3/extract_summary.json b/test/resources/omop/batch_3/extract_summary.json new file mode 100644 index 000000000..996b7dff6 --- /dev/null +++ b/test/resources/omop/batch_3/extract_summary.json @@ -0,0 +1,94 @@ +{ + "gitsha":"56e0eba8d098523c99f3c899979096d2c5ed4c5f", + "filesummaries":[ +"CARE_SITE.parquet: 4084 bytes", +"CARE_SITE_BAD.parquet: 3305 bytes", +"CARE_SITE_LINKS.parquet: 1201 bytes", +"CDM_SOURCE.parquet: 5823 bytes", +"CDM_SOURCE_BAD.parquet: 7852 bytes", +"CONDITION_OCCURRENCE.parquet: 6770 bytes", +"CONDITION_OCCURRENCE_BAD.parquet: 5004 bytes", +"CONDITION_OCCURRENCE_LINKS.parquet: 612 bytes", +"DEVICE_EXPOSURE.parquet: 4524 bytes", +"DEVICE_EXPOSURE_BAD.parquet: 4524 bytes", +"DEVICE_EXPOSURE_LINKS.parquet: 682 bytes", +"DRUG_EXPOSURE.parquet: 5782 bytes", +"DRUG_EXPOSURE_BAD.parquet: 3907 bytes", +"DRUG_EXPOSURE_LINKS.parquet: 597 bytes", +"FACT_RELATIONSHIP.parquet: 2167 bytes", +"FACT_RELATIONSHIP_BAD.parquet: 1357 bytes", +"LOCATION.parquet: 1865 bytes", +"LOCATION_BAD.parquet: 1343 bytes", +"LOCATION_LINKS.parquet: 904 bytes", +"MEASUREMENT.parquet: 6742 bytes", +"MEASUREMENT_BAD.parquet: 3982 bytes", +"MEASUREMENT_LINKS.parquet: 2309 bytes", +"OBSERVATION.parquet: 5614 bytes", +"OBSERVATION_BAD.parquet: 3618 bytes", +"OBSERVATION_LINKS.parquet: 1263 bytes", +"OBSERVATION_PERIOD.parquet: 2183 bytes", +"OBSERVATION_PERIOD_BAD.parquet: 1488 bytes", +"OBSERVATION_PERIOD_LINKS.parquet: 606 bytes", +"PERSON.parquet: 5420 bytes", +"PERSON_BAD.parquet: 3614 bytes", +"PERSON_LINKS.parquet: 1953 bytes", +"PROCEDURE_OCCURRENCE.parquet: 5230 bytes", +"PROCEDURE_OCCURRENCE_BAD.parquet: 3665 bytes", +"PROCEDURE_OCCURRENCE_LINKS.parquet: 1311 bytes", +"SPECIMEN.parquet: 4873 bytes", +"SPECIMEN_BAD.parquet: 3326 bytes", +"SPECIMEN_LINKS.parquet: 928 bytes", +"VISIT_DETAIL.parquet: 3228 bytes", +"VISIT_DETAIL_BAD.parquet: 3228 bytes", +"VISIT_DETAIL_LINKS.parquet: 435 bytes", +"VISIT_OCCURRENCE.parquet: 5259 bytes", +"VISIT_OCCURRENCE_BAD.parquet: 3429 bytes", +"VISIT_OCCURRENCE_LINKS.parquet: 1349 bytes" + ], + "datetime":"2023-12-08 14:08:58", + "user":"John Watts", + "settings":{ + "site":"UCLH", + "cdm_source_name":"Test Extract - UCLH OMOP CDM", + "cdm_source_abbreviation":"Test UCLH OMOP", + "project_logic":"mock_project_settings/project_logic.R", + "min_date": 20100101, + "max_date": 20241231, + "enabled_sources":"epic", + "output_format":"parquet", + "OMOP_version": 60, + "cohort":{ + "file":"settings/mock_project_settings/mock_cohort.csv", + "exclude_NDOO": true, + "exclude_confidential": true, + "min_age_at_encounter_start": 16, + "max_age_at_encounter_start": 80 + }, + "keep_source_vals": false, + "person":{ + "include_nhs_number": false, + "include_mrn": false, + "keep_day_of_birth": false, + "keep_month_of_birth": true, + "include_gp_as_primary_care_site": false + }, + "observation_period_strategy":"visit_span", + "local_timezone":"Europe/London", + "output_timezone":"GMT", + "condition_occurrence":{ + "include_sexual_health": false, + "allow_icd_as_std": true + }, + "measurements":{ + "include_file":null, + "include_measurement_concept_ids":null, + "non_generic_numeric_labs": 3040104 + }, + "location":{ + "keep_only_zip": true, + "replace_postcode_with_LSOA": true + }, + "mapping_effective_date": 19698, + "name":"mock_project_settings" + } +} diff --git a/test/resources/omop/batch_3/private/PERSON_LINKS.parquet b/test/resources/omop/batch_3/private/PERSON_LINKS.parquet new file mode 100644 index 0000000000000000000000000000000000000000..33492439bf223e565e5a1ddc429ce65934542e44 GIT binary patch literal 4252 zcmcgwOK;*<6gGs(tJBfUXpD%&B0?H8s^)?31qOrO%?eSFxZ9~6L3be z>o2If=yFtbG0S#0%l?A?gsO`!tFF7MdhRt317@VEnOecz^ZxER=boG7l&(@_h`dWa zD8UzTpCC4)L4qLG;6oDReKPzQi`U8ZXe5NVkr0xHiANp^7+brD9SQ{2g7J5OC>h*I z6YF<(B3nJx>UYfxO(BEfQ24t*_-!D55a0Zew;dFdAcF|OUHAeh3lPtNcY*jz5D_;L zLUIIgMghYM@z1px#CxZrt*NGcIIu)XS6`}*2kPI!_%A^KJM*;Q*B5MpOil6`Ip2cV znSC|45+5jS*NG5<>1iw!_+M4P`HpW^Dz<*nhlDEgX-9EK6L$sV@%fyI(420hY z;(y(Wzq=J(;mj8;q`CxPQ1wwp?~wOrMc)qrb0dO_OAzs2H>2eCIQ+d+U6j?6%QMxw z(&TC0UWJAD_nYyrH=|(d8xMC|aNKR+9-hxWU(7~ib0V3o!of#AT+nk%yA}upiTDqn z%ph*liejn#{-mbw1L2x@n$E|QE zz8U^z{Q&{TA+p{RO-1ZSe%g2)(@aGjUC3R1V3_^bi`_3?$2!1+Rm>92rW$*^8#6^i z1%^4$b)fpTXxTtgc-`&6at|+6w4~mA354tK0y|g7Zs)yRYO1bW%=27W7q-(w;bH?* zle>zGU|{r|sh}&hRoOwIHrBV z`>8CTxYWC%y`Ngnzen()3d~0Fa{PJO4FtyLz*0rKYvK0PHOYdT5_M7Bty|zZP;XubW7rENN1q%qkbt%Yw}*TC7TuC z(W5baC1=@v_VBpRW{cNKfg2(}$8}0U@-t_bEf)|bT2uIR5Bzt8GZw`LjMPx&G6}WLV85)9(L{r5D_JbJ zg?hCGJ`#;Hz|4yX=5|-iG{5a6ujloFgYQXkQ&S zSBuzvbW8?v3;CnG81;d2c-z+($A7ydKJbRg$@v@}le6U` z{Zcv7hfsrks6kIME7!-GPreIjN-B&pr7Y^hSsrUgV8iDfwMIFTYi9tSzL3v&_vDB? z1%0QNWh1o`+kfhZRrm^{RwK!s3VK#lad|Rkxj9QX& z6hnu#t*dT-4jXXB8uhX_Uxrl3w3Nd}VpaVGV|3N1SCVDF9$taD!dE-8!ClLVY3${k zcJdwJsHn?(NU6Z%pzpD4F0t9z77p!3-VOwgMd2c3E7*}ljZokqQwcIOV0 zhP5N@$=Hu#p!20)K~F$mv7>S}H7vDr8tQj;c8-!ly?8D1nF03E&JD_EsS#@?(9|oS z*QAGGiDoYS482~WdC8>E@&vuyA=*Ww83=|RVWy1Ucz$9M&1ul|<`!nJa|K4oU6W}n z*mse>$G(c_ntwl``44lufxg|RnXOIyG1w9hb+12+XYTxfEyRO+0$mD+xrsg9!Y(`L zy^P2A{DWrpU~Es*XV|_%GZi|E^D3j;WfBYv=?aT)qT*Q9Wr7#PJ!0VOz@4LKdIA+ryo128hV~?tNO8_(eY#=`D-lsCYFDazy2=YX^eoP;~38!SO6H|+rVSM zSpGSE_btx71PDJ#5j+%?|8plp-&~>jqG397f1>LFFPowMTwf9#he;{_^G^Pscn0=f z8t*Im>oy%H@jffu0RNg}2U%72bcnu{rM>ur8*63iEeU&~9PPwnaVr1swG4f0h3Ym^ zRo(Xu$L@xfqI(3>Z?WX>vHVZh@_!{VSNgmRYr0*gQoFu7(Jh(Yg~Z*DUs(l;z6sik zS)u4#AU}`mKGn{^F_MXNCjIQASNQ`J?xSTn`O8i+k-wgNvvZGvhDq(ZvaQK}>hEi> za)zzxi(}O>L(BGaPqW{>%FRFqRL+y_v7URF&DpZ0gTM|=6F5JRy#Uw}p&b{;?4Zb!d~Km!i#NdHRK9)) zm{$qNUAXBNfm~{G4#LQEuLN2r9VW7I%sjA~;dt5&!4b1Z~CD}5E!BOczw3(<23%@OQ_6+5XrcmW=v zu3rP{Th1~=smYf_wtJ!+@PT4?j2A{#(U&bx7Robj>Wn$NHBl_h4=Pm^+UGeYXTFBY!q7(*D=GP zz?j%pv3Xyq;dqiO35%J?UJ|$20PKxZvDh!GO^gW{noxGZ|4bTiI5uEp<~m;~=wg-l z<*bS!Tl`e35q%(u{R#Le3nd#d1=QM*MDD6Q zUumm;=ss$e8Kt?Xhy&ceU8rTFpcFZLXP{pe?hx*iYR^c2K@Na!ooGu^M^+MBr?OB9 z$-SHI7ur)4b_T5qxmRnPlBwGG%q8NZGbl0b>Ao@Qp(eiCXXgPD&jd8Zulm`%Kxp<7 z<_%_hM`Tj8uzo#yR;K%nDfI2B%9MBLeM`Vhkn(!B-f>(o~n`C${I2Q>Aus0mQ z@euV*M+-VGEY89+Ff;SqEd6LE5}aK~QgfFV!Z+HTmLQ3}j;*(X1!ob@W%vSM9>Cc59QKa^#@=uM#}TmIf(|F(9|df{7ur&% z+tS<+fBGE##VmldC3xf=4|4H`-=PlY(mo)uHyprm6l%9TApbOHgS>8QRcL`i+qxU? zU!SKRKM#0qVII-PS~*u zgzEGcSLsi#MnHh?`CJBpJ$#%Od&9UtF^JuQw^4)vgZv?AE8v>i66HNv>iC=DFW2ec zUI$ayvd|QE%OC0~^z-){k>Cp^t>@qAahfjcL*kOCkzd}R-@7qUBd6Hr@q)e0wZ7by zb)O7?JN`3Fe-?hC9YHBJ#~!|HtJq=oZmZXUjML|op5d{`vp>H?|L!GUo}Hb^AAjA( z^Ri7t6OXSKejK5H7CBSXyEOFaD>kCwh=jfhdjA#r2e0@kXyLyvCaLgTSJ;=sUr1Vs+8-0E8{)6ZN^ub%S!)XmEdUG+V zajFCzT2E1+)z!I<4$T$v-fF|^DtVRgHU93~&@klf65*05)-46rV7t2N*Yss9hMi zuE!|U1#Q|y4_;Z*mlH%^F0`X=2Ut|)yEg%|setI{^6LY@x)s)|ffiVq&HSXEm>egI(_hrer;z#F0DOh?O2f&)+0|JNiBNX6cKmrr5oy~+ktS3(yW?o8W!gq4+u$Z>%(z< zoz`^L4wh*qd9Zh$2aRQ>GdJO==AY(i6DD~uj_mUUX0z^`@?nGkKl|&>7Kp>}q-fNNy&1a6Ju4Ml_tRqlwczP)@BP zQa*GfPH%NcR##-c!*zz3)Z0U@(`gM*JAFusa!bSB_{#0r?RXSorj;F?gSV47Mujgj zsV*Nyvg_|xbT`7|~ehS`Qc{CtmLR+w~y$udlX$uvk`I_(-CVCVX1Ji`=?Hav+G zGG1Vq0R!vc8-=W`WZ(&HhIy+2ed*TNqdj}IN?gIYyvBC<4DMHN*DHqy)r8N!xh!DM9#xmi zNg;=r7+I_&+aUj-ez$;o1IGA3VpB1xnkI4!YD(r*wkc-FxL&W8_d!Oib{FvGyK=pf zZ1Rb6n^y%Qk7dibjh%+P=fN%vVSZKz{aLs`5BXxhmS7LWYEh{sQq9^jI}}v52Yq{7 zWn0Fx*HuN~bLBl=&FX-06)+A2HPx(F%5j*x$0tfnSU*G@bvD(m8#o6wutlv}+7t82 zz+Bhnwe6!#dD1zRp18I-G;9`E#9S8Em-k1y&BwZUUMG%vwRprQl2fsI=LkMF0N%tl zj|$|xU`JiZE0b`$^`A?|`P^Y@BZIu0q*vgnT2h33c`8n)PJs_-gMpZRw)%8pK#rN5 zRJjV%z3p*SK%i z)x&u9n(Np#=VAjEVo9dhggOsmQfi2`7{r|}S1Ar_ z)nXFcPzxmCeaJ5h#MYt;HDp5R^D@?og~L*2Ww6m)moZnsT&q4vU|q&lQavK0x!xFkqb1qy9*07yp*5Fx#>j4wT>p^iFt_JCKye2H+wPF*m3F#Q#B}&+a z2CO76GPcN@@m$?HT3W*E#`Ze7dBEib8{~DDMUo+M> z*6xkYdq{)`_;j3?Ko_d)J`Z&d_&=GnH(JlEBObEE&=;BDSige{CgvYzZD8)tFn4ig znz7_%40$c17oX%)!KX|A2~Um(jcjkhg%))%Fj<1{>rkgLnSv#sDWfFDE1#g4Bp;=q zp2_((*N`vRl`-szo0$9ITN+j2wx?e$hg0<}(;mz^dIpoW@Nc(>eK)Yv!Dlin?}~wT zjRPydk>D<0!*60gSPZ^KM4`_yrN#DVrBFHPT#M-9Ww*1k{PeOemhcPOJ_j$sX zi8qg=fRAZJd^PPyG~=`%Zso>1b~h&a9iW~;E;%m>H2j@;^Gzze*ycL#^d$Y3&~D!% fKiDjqKgcW^Kh-RJ8KI~X{-gfANKsqxpC$hX54cW` literal 0 HcmV?d00001 From 74c1db1fb259c4c16dbc35bfcdd8611c9679dab8 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Wed, 31 Jan 2024 19:00:33 +0000 Subject: [PATCH 05/19] Update tests, implementation not quite there yet --- cli/src/pixl_cli/_io.py | 52 +++++++++++++++++++++---- cli/src/pixl_cli/main.py | 2 +- cli/tests/test_messages_from_parquet.py | 41 ++++++++++++++++--- 3 files changed, 82 insertions(+), 13 deletions(-) diff --git a/cli/src/pixl_cli/_io.py b/cli/src/pixl_cli/_io.py index b9a368d52..d0fb6743c 100644 --- a/cli/src/pixl_cli/_io.py +++ b/cli/src/pixl_cli/_io.py @@ -47,20 +47,58 @@ def messages_from_state_file(filepath: Path) -> list[Message]: return [deserialise(line) for line in filepath.open().readlines() if string_is_non_empty(line)] -def config_from_log_file(parquet_path: Path) -> tuple[str, datetime]: - log_file = parquet_path / "extract_summary.json" +def determine_batch_structure(extract_path: Path) -> tuple[str, datetime, list[Path]]: + log_filename = "extract_summary.json" + single_batch_logfile = extract_path / log_filename + if single_batch_logfile.exists(): + project_name, omop_es_timestamp = config_from_log_file(single_batch_logfile) + return project_name, omop_es_timestamp, [extract_path] + + # should it really be 'extract_*'? + batch_dirs = list(extract_path.glob("batch_*")) + extract_ids = [config_from_log_file(log_file / log_filename) for log_file in batch_dirs] + # There should be one or more log files, all with the same identifiers + if not extract_ids: + err = f"No batched or unbatched log files found in {extract_path}" + raise RuntimeError(err) + + distinct_extract_ids = set(extract_ids) + if len(distinct_extract_ids) != 1: + err = ( + f"Found {len(batch_dirs)} log files with different IDs: " + f"Batch dirs: {batch_dirs}, IDs: {extract_ids}" + ) + raise RuntimeError(err) + + project_name, omop_es_timestamp = distinct_extract_ids.pop() + return project_name, omop_es_timestamp, batch_dirs + + +def config_from_log_file(log_file: Path) -> tuple[str, datetime]: logs = json.load(log_file.open()) project_name = logs["settings"]["cdm_source_name"] omop_es_timestamp = datetime.fromisoformat(logs["datetime"]) return project_name, omop_es_timestamp -def copy_parquet_return_logfile_fields(parquet_path: Path) -> tuple[str, datetime]: - """Copy public parquet file to extracts directory, and return fields from logfile""" - project_name, omop_es_timestamp = config_from_log_file(parquet_path) +def copy_parquet_return_logfile_fields(extract_path: Path) -> tuple[str, datetime, list[Path]]: + """ + Copy public parquet file to extracts directory, and return fields from logfile + :param extract_path: top-level dir for the OMOP ES extract + (either a single or multi batch extract) + :returns: ( slugified project name of all the batches, + slugified timestamp of all the batches, + list of directories containing batches + - can be just the top level + directory in the case of a single batch + ) + :raises RuntimeError: if no log file can be found, or there is more than one batch and + project names or timestamps don't match. + """ + project_name, omop_es_timestamp, batch_dirs = determine_batch_structure(extract_path) extract = ParquetExport(project_name, omop_es_timestamp, HOST_EXPORT_ROOT_DIR) - project_name_slug = extract.copy_to_exports(parquet_path) - return project_name_slug, omop_es_timestamp + project_name_slug = extract.copy_to_exports(extract_path) + return project_name_slug, omop_es_timestamp, batch_dirs def messages_from_parquet( diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index 10291bbc0..af64b4633 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -80,7 +80,7 @@ def populate(parquet_dir: Path, *, restart: bool, queues: str) -> None: └── extract_summary.json """ logger.info(f"Populating queue(s) {queues} from {parquet_dir}") - project_name, omop_es_datetime = copy_parquet_return_logfile_fields(parquet_dir) + project_name, omop_es_datetime, batch_dirs = copy_parquet_return_logfile_fields(parquet_dir) messages = messages_from_parquet(parquet_dir, project_name, omop_es_datetime) for queue in queues.split(","): diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index 0e64a3a17..541ea4477 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -16,25 +16,40 @@ import datetime +import pytest from core.patient_queue.message import Message from pixl_cli._io import copy_parquet_return_logfile_fields, messages_from_parquet -def test_messages_from_parquet(omop_es_batch_generator) -> None: +@pytest.mark.parametrize( + ("batches", "single_batch", "expected_message_indexes"), + [ + (["batch_1", "batch_2"], False, [0, 1]), + (["batch_1"], True, [0]), + (["batch_1"], False, [0]), + (["batch_2"], True, [1]), + (["batch_2"], False, [1]), + ], +) +def test_messages_from_parquet( + omop_es_batch_generator, batches, single_batch, expected_message_indexes +) -> None: """ Given a valid OMOP ES extract with 4 procedures, two of which are x-rays. When the messages are generated from the directory and the output of logfile parsing Then two messages should be generated """ # Arrange - omop_parquet_dir = omop_es_batch_generator() - project_name, omop_es_datetime = copy_parquet_return_logfile_fields(omop_parquet_dir) + omop_parquet_dir = omop_es_batch_generator(batches, single_batch=single_batch) + project_name, omop_es_datetime, batch_dirs = copy_parquet_return_logfile_fields( + omop_parquet_dir + ) # Act messages = messages_from_parquet(omop_parquet_dir, project_name, omop_es_datetime) # Assert assert all(isinstance(msg, Message) for msg in messages) - expected_messages = [ + all_expected_messages = [ Message( mrn="987654321", accession_number="AA12345601", @@ -52,5 +67,21 @@ def test_messages_from_parquet(omop_es_batch_generator) -> None: omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), ), ] - + expected_messages = [all_expected_messages[i] for i in expected_message_indexes] assert messages == expected_messages + + +def test_conflicting_batches(omop_es_batch_generator) -> None: + """ + Batches 1 and 3 have different timestamps so should fail if they are given to us as part of the + same extract. + """ + omop_parquet_dir = omop_es_batch_generator(["batch_1", "batch_3"], single_batch=False) + with pytest.raises(RuntimeError, match=r"log files with different IDs.*batch_.*batch_"): + copy_parquet_return_logfile_fields(omop_parquet_dir) + + +def test_empty_batches(tmp_path) -> None: + """Empty dir, nothing found.""" + with pytest.raises(RuntimeError, match=r"No batched or unbatched log files found in"): + copy_parquet_return_logfile_fields(tmp_path) From b9ce649eaa9b3f02ea53831187ed4b2ecc4694f0 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 1 Feb 2024 18:46:57 +0000 Subject: [PATCH 06/19] Need to write to batch dir on export, too --- cli/tests/test_copy_omop.py | 9 +++++---- pixl_core/src/core/exports.py | 11 +++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/cli/tests/test_copy_omop.py b/cli/tests/test_copy_omop.py index 3a5cd095e..a5bbf442c 100644 --- a/cli/tests/test_copy_omop.py +++ b/cli/tests/test_copy_omop.py @@ -15,6 +15,7 @@ from __future__ import annotations import datetime +from pathlib import Path import pytest from core.exports import ParquetExport @@ -32,7 +33,7 @@ def test_new_project_copies(omop_es_batch_generator, export_dir): input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00") omop_files = ParquetExport(project_name, input_date, export_dir) # ACT - omop_files.copy_to_exports(input_dir) + omop_files.copy_to_exports(input_dir, Path()) # ASSERT output_base = omop_files.export_dir / "really-great-cool-project" @@ -64,13 +65,13 @@ def test_second_export(omop_es_batch_generator, export_dir): first_export_datetime = datetime.datetime.fromisoformat("2020-06-10T18:00:00") omop_files = ParquetExport(project_name, first_export_datetime, export_dir) - omop_files.copy_to_exports(input_dir) + omop_files.copy_to_exports(input_dir, Path()) second_export_datetime = datetime.datetime.fromisoformat("2020-07-10T18:00:00") omop_files = ParquetExport(project_name, second_export_datetime, export_dir) # ACT - omop_files.copy_to_exports(input_dir) + omop_files.copy_to_exports(input_dir, Path()) # ASSERT output_base = omop_files.export_dir / "really-great-cool-project" @@ -96,4 +97,4 @@ def test_project_with_no_public(resources, export_dir): input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00") omop_files = ParquetExport(project_name, input_date, export_dir) with pytest.raises(FileNotFoundError): - omop_files.copy_to_exports(input_dir) + omop_files.copy_to_exports(input_dir, Path()) diff --git a/pixl_core/src/core/exports.py b/pixl_core/src/core/exports.py index b9a013d2d..aa8ae75c4 100644 --- a/pixl_core/src/core/exports.py +++ b/pixl_core/src/core/exports.py @@ -58,22 +58,25 @@ def _get_slugs(project_name: str, extract_datetime: datetime.datetime) -> tuple[ extract_time_slug = slugify.slugify(extract_datetime.isoformat()) return project_slug, extract_time_slug - def copy_to_exports(self, input_omop_dir: pathlib.Path) -> str: + def copy_to_exports(self, input_omop_dir: pathlib.Path, rel_batch_name: pathlib.Path) -> str: """ Copy public omop directory as the latest extract for the project. Creates directories if they don't already exist. :param input_omop_dir: parent path for input omop data, with a "public" subdirectory + :param rel_batch_name: batch name which determines output subdir to copy to, or Path() if + this is a single batch extract :raises FileNotFoundError: if there is no public subdirectory in `omop_dir` :returns str: the project slug, so this can be registered for export to the DSH """ public_input = input_omop_dir / "public" # Make directory for exports if they don't exist - ParquetExport._mkdir(self.public_output) - logger.info("Copying public parquet files from %s to %s", public_input, self.public_output) + public_batch_output = self.public_output / rel_batch_name + ParquetExport._mkdir(public_batch_output) + logger.info("Copying public parquet files from %s to %s", public_input, public_batch_output) # Copy extract files, overwriting if it exists - shutil.copytree(public_input, self.public_output, dirs_exist_ok=True) + shutil.copytree(public_input, public_batch_output, dirs_exist_ok=True) # Symlink this extract to the latest directory self.latest_symlink.unlink(missing_ok=True) From f55fb92f03f3fdaf356f331aebe7692fe0747635 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 2 Feb 2024 17:14:28 +0000 Subject: [PATCH 07/19] Reading and copying extracts must be done on a batch-by-batch basis. --- cli/src/pixl_cli/_io.py | 31 +++++++++++++++++++------ cli/src/pixl_cli/main.py | 10 ++++---- cli/tests/test_messages_from_parquet.py | 4 +++- 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/cli/src/pixl_cli/_io.py b/cli/src/pixl_cli/_io.py index d0fb6743c..ebfe05326 100644 --- a/cli/src/pixl_cli/_io.py +++ b/cli/src/pixl_cli/_io.py @@ -48,15 +48,27 @@ def messages_from_state_file(filepath: Path) -> list[Message]: def determine_batch_structure(extract_path: Path) -> tuple[str, datetime, list[Path]]: + """ + Takes the top level extract directory and determines if it's a single or multi batch setup, + and returns the resultant metadata. + :param extract_path: the top level path that may contain the single batch or one or + more directories containing batches. + :returns: ( slugified project name of all the batches, + slugified timestamp of all the batches, + list of directories containing batches + - can be just the top level directory in the case of a single batch + ) + :raises RuntimeError: if config files are non-existent or contradictory + """ log_filename = "extract_summary.json" single_batch_logfile = extract_path / log_filename if single_batch_logfile.exists(): - project_name, omop_es_timestamp = config_from_log_file(single_batch_logfile) + project_name, omop_es_timestamp = _config_from_log_file(single_batch_logfile) return project_name, omop_es_timestamp, [extract_path] # should it really be 'extract_*'? batch_dirs = list(extract_path.glob("batch_*")) - extract_ids = [config_from_log_file(log_file / log_filename) for log_file in batch_dirs] + extract_ids = [_config_from_log_file(log_file / log_filename) for log_file in batch_dirs] # There should be one or more log files, all with the same identifiers if not extract_ids: err = f"No batched or unbatched log files found in {extract_path}" @@ -74,7 +86,11 @@ def determine_batch_structure(extract_path: Path) -> tuple[str, datetime, list[P return project_name, omop_es_timestamp, batch_dirs -def config_from_log_file(log_file: Path) -> tuple[str, datetime]: +def _config_from_log_file(log_file: Path) -> tuple[str, datetime]: + """ + Load a config file from the given file. This method has no knowledge of + multi- vs single- batch so should not be called directly. + """ logs = json.load(log_file.open()) project_name = logs["settings"]["cdm_source_name"] omop_es_timestamp = datetime.fromisoformat(logs["datetime"]) @@ -89,16 +105,17 @@ def copy_parquet_return_logfile_fields(extract_path: Path) -> tuple[str, datetim :returns: ( slugified project name of all the batches, slugified timestamp of all the batches, list of directories containing batches - - can be just the top level - directory in the case of a single batch + - can be just the top level directory in the case of a single batch ) :raises RuntimeError: if no log file can be found, or there is more than one batch and project names or timestamps don't match. """ project_name, omop_es_timestamp, batch_dirs = determine_batch_structure(extract_path) extract = ParquetExport(project_name, omop_es_timestamp, HOST_EXPORT_ROOT_DIR) - project_name_slug = extract.copy_to_exports(extract_path) - return project_name_slug, omop_es_timestamp, batch_dirs + for batch in batch_dirs: + batch_subdir = batch.relative_to(extract_path) + extract.copy_to_exports(batch, str(batch_subdir)) + return extract.project_slug, omop_es_timestamp, batch_dirs def messages_from_parquet( diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index af64b4633..9d60392fa 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -28,8 +28,8 @@ from pixl_cli._config import cli_config from pixl_cli._database import filter_exported_or_add_to_db from pixl_cli._io import ( - config_from_log_file, copy_parquet_return_logfile_fields, + determine_batch_structure, messages_from_parquet, messages_from_state_file, ) @@ -81,7 +81,9 @@ def populate(parquet_dir: Path, *, restart: bool, queues: str) -> None: """ logger.info(f"Populating queue(s) {queues} from {parquet_dir}") project_name, omop_es_datetime, batch_dirs = copy_parquet_return_logfile_fields(parquet_dir) - messages = messages_from_parquet(parquet_dir, project_name, omop_es_datetime) + messages = [] + for batch in batch_dirs: + messages.extend(messages_from_parquet(batch, project_name, omop_es_datetime)) for queue in queues.split(","): state_filepath = state_filepath_for_queue(queue) @@ -113,13 +115,13 @@ def extract_radiology_reports(parquet_dir: Path) -> None: PARQUET_DIR: Directory containing the extract_summary.json log file defining which extract to export radiology reports for. """ - project_name, omop_es_datetime = config_from_log_file(parquet_dir) + project_name, omop_es_timestamp, _batch_dirs = determine_batch_structure(parquet_dir) # Call the EHR API api_config = api_config_for_queue("ehr") response = requests.post( url=f"{api_config.base_url}/export-radiology-as-parquet", - json={"project_name": project_name, "extract_datetime": omop_es_datetime.isoformat()}, + json={"project_name": project_name, "extract_datetime": omop_es_timestamp.isoformat()}, timeout=10, ) diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index 541ea4477..5a527fdde 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -45,7 +45,9 @@ def test_messages_from_parquet( omop_parquet_dir ) # Act - messages = messages_from_parquet(omop_parquet_dir, project_name, omop_es_datetime) + messages = [] + for batch in batch_dirs: + messages.extend(messages_from_parquet(batch, project_name, omop_es_datetime)) # Assert assert all(isinstance(msg, Message) for msg in messages) From c68d1f322ba27c4669c7606826e5e9524136458c Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 2 Feb 2024 17:22:15 +0000 Subject: [PATCH 08/19] Set up the system test so it has all the original data, which is now split over two batches. --- test/run-system-test.sh | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test/run-system-test.sh b/test/run-system-test.sh index f0efeb87a..5ce10eb2a 100755 --- a/test/run-system-test.sh +++ b/test/run-system-test.sh @@ -28,14 +28,17 @@ docker compose --env-file .env.test -p system-test up --wait -d --build --remove ./scripts/insert_test_data.sh pip install -e "${PACKAGE_DIR}/pixl_core" && pip install -e "${PACKAGE_DIR}/cli" -pixl populate "${PACKAGE_DIR}/test/resources/omop/batch_1" +# set up test input data - choose the two "good" batches +INPUT_TEMP_DIR=$(mktemp -d) +cp -a "${PACKAGE_DIR}/test/resources/omop/batch_"[12] "$INPUT_TEMP_DIR" +pixl populate "$INPUT_TEMP_DIR" pixl start sleep 65 # need to wait until the DICOM image is "stable" = 60s ./scripts/check_entry_in_pixl_anon.sh ./scripts/check_entry_in_orthanc_anon.py ./scripts/check_max_storage_in_orthanc_raw.sh -pixl extract-radiology-reports "${PACKAGE_DIR}/test/resources/omop/batch_1" +pixl extract-radiology-reports "$INPUT_TEMP_DIR" ./scripts/check_radiology_parquet.py \ ../exports/test-extract-uclh-omop-cdm/latest/radiology/radiology.parquet From 036249697b2785625da3d729d01245adc9396762 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 2 Feb 2024 18:31:49 +0000 Subject: [PATCH 09/19] Explain what the different batches are for --- test/README.md | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/test/README.md b/test/README.md index f524ed1d7..dfa114228 100644 --- a/test/README.md +++ b/test/README.md @@ -35,7 +35,8 @@ A test `pixl_config.yml` file is provided to run the PIXL pipeline. ### Dummy services `./dummy-services` contains a Python script and Dockerfile to mock the CogStack service, used for -de-identification of the radiology reports in the EHR API. +de-identification of the radiology reports in the EHR API, and an FTP server for testing the +DSH upload. #### FTP server @@ -65,8 +66,14 @@ and are copied into `/etc/ssl/private` when building the Docker container. ### Resources - `./resources/` provides 2 mock DICOM images used to populate the mock VNA. -- `./resources/omop` contains mock public and private Parquet files used to populate the message - queues and extract the radiology reports +- `./resources/omop` contains several extract batches, each containing a config (log) file and + mock public and private Parquet files used to populate the message + queues and extract the radiology reports. + System and unit tests are able to set up some subset of these + batches depending on what they're testing. `batch_1` and `batch_2` together form the happy + path for the system test. `batch_3` has a mismatching timestamp so is supposed + to fail when used with 1 or 2. Any batch can also be used alone as a single-batch (old style) + extract. ### VNA config From 6fba6c5efc1b30da97178d8822f7c1c38ef0107f Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 2 Feb 2024 19:05:44 +0000 Subject: [PATCH 10/19] Minor typing error --- cli/src/pixl_cli/_io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/src/pixl_cli/_io.py b/cli/src/pixl_cli/_io.py index ebfe05326..4884f2f95 100644 --- a/cli/src/pixl_cli/_io.py +++ b/cli/src/pixl_cli/_io.py @@ -114,7 +114,7 @@ def copy_parquet_return_logfile_fields(extract_path: Path) -> tuple[str, datetim extract = ParquetExport(project_name, omop_es_timestamp, HOST_EXPORT_ROOT_DIR) for batch in batch_dirs: batch_subdir = batch.relative_to(extract_path) - extract.copy_to_exports(batch, str(batch_subdir)) + extract.copy_to_exports(batch, batch_subdir) return extract.project_slug, omop_es_timestamp, batch_dirs From 0d15136894921ad5dc6fdb455ca2066108f220af Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Fri, 2 Feb 2024 19:16:40 +0000 Subject: [PATCH 11/19] Increase test coverage slightly; not sure if worth it. --- cli/tests/test_messages_from_parquet.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index 5a527fdde..5bf029248 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -15,10 +15,15 @@ from __future__ import annotations import datetime +import shutil import pytest from core.patient_queue.message import Message -from pixl_cli._io import copy_parquet_return_logfile_fields, messages_from_parquet +from pixl_cli._io import ( + copy_parquet_return_logfile_fields, + determine_batch_structure, + messages_from_parquet, +) @pytest.mark.parametrize( @@ -87,3 +92,17 @@ def test_empty_batches(tmp_path) -> None: """Empty dir, nothing found.""" with pytest.raises(RuntimeError, match=r"No batched or unbatched log files found in"): copy_parquet_return_logfile_fields(tmp_path) + + +def test_missing_public(omop_es_batch_generator) -> None: + """ + This error is hard to reach in real life because a missing public dir would trigger an error + from copy_parquet_return_logfile_fields first. + """ + omop_parquet_dir = omop_es_batch_generator(["batch_1"], single_batch=True) + # simulate broken input batch + shutil.rmtree(omop_parquet_dir / "public", ignore_errors=False) + + project_name, omop_es_datetime, batch_dirs = determine_batch_structure(omop_parquet_dir) + with pytest.raises(NotADirectoryError): + messages_from_parquet(omop_parquet_dir, project_name, omop_es_datetime) From d253f34a85a3febe9b5d6dbabe9508a23bf4a3bb Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Mon, 5 Feb 2024 16:40:38 +0000 Subject: [PATCH 12/19] Review suggestions: log batch details, sort globbed batch dirs. --- cli/src/pixl_cli/_io.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cli/src/pixl_cli/_io.py b/cli/src/pixl_cli/_io.py index 4884f2f95..a6edbf8d5 100644 --- a/cli/src/pixl_cli/_io.py +++ b/cli/src/pixl_cli/_io.py @@ -67,7 +67,8 @@ def determine_batch_structure(extract_path: Path) -> tuple[str, datetime, list[P return project_name, omop_es_timestamp, [extract_path] # should it really be 'extract_*'? - batch_dirs = list(extract_path.glob("batch_*")) + # The order doesn't matter functionally, but sort here so failures are more repeatable. + batch_dirs = sorted(extract_path.glob("batch_*")) extract_ids = [_config_from_log_file(log_file / log_filename) for log_file in batch_dirs] # There should be one or more log files, all with the same identifiers if not extract_ids: @@ -82,6 +83,7 @@ def determine_batch_structure(extract_path: Path) -> tuple[str, datetime, list[P ) raise RuntimeError(err) + logger.info(f"Batches found: ({len(batch_dirs)}) {batch_dirs}") project_name, omop_es_timestamp = distinct_extract_ids.pop() return project_name, omop_es_timestamp, batch_dirs @@ -114,7 +116,9 @@ def copy_parquet_return_logfile_fields(extract_path: Path) -> tuple[str, datetim extract = ParquetExport(project_name, omop_es_timestamp, HOST_EXPORT_ROOT_DIR) for batch in batch_dirs: batch_subdir = batch.relative_to(extract_path) + logger.info(f"Copying extract files from batch at {batch}...") extract.copy_to_exports(batch, batch_subdir) + logger.info(f"Done copying extract files from batch at {batch}") return extract.project_slug, omop_es_timestamp, batch_dirs From 6cff5a27efc50fef62e0f550db5d5e888311270c Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Mon, 5 Feb 2024 17:12:11 +0000 Subject: [PATCH 13/19] Document the single and multi batch extracts that are accepted by the CLI --- cli/README.md | 39 ++++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/cli/README.md b/cli/README.md index c09ea7bb5..ca7c8d1ce 100644 --- a/cli/README.md +++ b/cli/README.md @@ -44,21 +44,42 @@ pixl --help Populate queue for PACS and EHR extraction ```bash -pixl populate +pixl populate PARQUET_DIR ``` -where `parquet_dir` contains at least the following files: +where `PARQUET_DIR` has one of the two following setups, which the CLI will automatically detect: +* In single batch mode: ```sh -parquet_dir -├── extract_summary.json -├── private -│ ├── PERSON_LINKS.parquet -│ └── PROCEDURE_OCCURRENCE_LINKS.parquet -└── public - └── PROCEDURE_OCCURRENCE.parquet +└── PARQUET_DIR + ├── extract_summary.json + ├── private + │ ├── PERSON_LINKS.parquet + │ └── PROCEDURE_OCCURRENCE_LINKS.parquet + └── public + └── PROCEDURE_OCCURRENCE.parquet ``` +* In multi-batch configuration: +```sh +└── PARQUET_DIR + ├── batch_1 + │ ├── extract_summary.json + │ ├── private + │ │ ├── PERSON_LINKS.parquet + │ │ └── PROCEDURE_OCCURRENCE_LINKS.parquet + │ └── public + │ └── PROCEDURE_OCCURRENCE.parquet + └── batch_2 + ├── extract_summary.json + ├── private + │ ├── PERSON_LINKS.parquet + │ └── PROCEDURE_OCCURRENCE_LINKS.parquet + └── public + └── PROCEDURE_OCCURRENCE.parquet +``` + + Start the imaging extraction ```bash From 21db43dd0eb43688845c0b60ca3ee949b65f136f Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Mon, 5 Feb 2024 18:51:36 +0000 Subject: [PATCH 14/19] Check that truncated parquet files stop the process. --- cli/tests/test_messages_from_parquet.py | 32 +++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index 5bf029248..cf11ed20d 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -15,6 +15,7 @@ from __future__ import annotations import datetime +import os import shutil import pytest @@ -24,6 +25,7 @@ determine_batch_structure, messages_from_parquet, ) +from pyarrow import ArrowException @pytest.mark.parametrize( @@ -106,3 +108,33 @@ def test_missing_public(omop_es_batch_generator) -> None: project_name, omop_es_datetime, batch_dirs = determine_batch_structure(omop_parquet_dir) with pytest.raises(NotADirectoryError): messages_from_parquet(omop_parquet_dir, project_name, omop_es_datetime) + + +@pytest.mark.parametrize( + "file_to_corrupt", + [ + # include all parquet files required to generate messages + "public/PROCEDURE_OCCURRENCE.parquet", + "private/PROCEDURE_OCCURRENCE_LINKS.parquet", + "private/PERSON_LINKS.parquet", + ], +) +def test_broken_parquet_dir(omop_es_batch_generator, file_to_corrupt) -> None: + """ + Check that if any of the parquet files we use to generate messages are incomplete, + nothing much happens. We aren't checking the validity of parquet files that we simply copy + to the extract dir. + We fail even if batch_1 is fine but batch_2 is faulty; this might be an argument + for continuing to call messages_from_parquet on *all* batches before sending any messages, + as long as that's not too slow or uses too much memory. + """ + omop_parquet_dir = omop_es_batch_generator(["batch_1", "batch_2"], single_batch=False) + # Assume most likely error is an incomplete copy. Because parquet has a magic footer, + # even one byte missing at the end should be enough for it to fail. + to_corrupt = omop_parquet_dir / "batch_2" / file_to_corrupt + new_size = to_corrupt.stat().st_size - 1 + os.truncate(to_corrupt, new_size) + project_name, omop_es_datetime, batch_dirs = determine_batch_structure(omop_parquet_dir) + with pytest.raises(ArrowException): # noqa: PT012 It may not fail on all, but must fail on one + for b in batch_dirs: + messages_from_parquet(b, project_name, omop_es_datetime) From 9cb88d1e8c794ce21180d11c4c9e00293e26bcc3 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Mon, 5 Feb 2024 19:26:27 +0000 Subject: [PATCH 15/19] Fix up merge, new code in main needs to be OMOP ES batch-aware. --- pixl_core/tests/test_upload.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pixl_core/tests/test_upload.py b/pixl_core/tests/test_upload.py index 5e05d73b7..ab1938ba3 100644 --- a/pixl_core/tests/test_upload.py +++ b/pixl_core/tests/test_upload.py @@ -74,7 +74,8 @@ def test_upload_parquet(parquet_export, mounted_data) -> None: # ARRANGE parquet_export.copy_to_exports( - pathlib.Path(__file__).parents[2] / "test" / "resources" / "omop" + pathlib.Path(__file__).parents[2] / "test" / "resources" / "omop" / "batch_1", + pathlib.Path(), ) with (parquet_export.public_output.parent / "radiology.parquet").open("w") as handle: handle.writelines(["dummy data"]) From 2b3a33b86706ea767f2dd72b3da383d76b1452b6 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Wed, 7 Feb 2024 16:50:40 +0000 Subject: [PATCH 16/19] Fix test to be batch aware. It now (correctly) fails. --- pixl_core/tests/conftest.py | 38 +++++++++++++++++++++++++++++- pixl_core/tests/test_upload.py | 42 ++++++++++++++++++++++------------ 2 files changed, 65 insertions(+), 15 deletions(-) diff --git a/pixl_core/tests/conftest.py b/pixl_core/tests/conftest.py index 41bf6d75b..007a947e2 100644 --- a/pixl_core/tests/conftest.py +++ b/pixl_core/tests/conftest.py @@ -16,9 +16,10 @@ import datetime import os import pathlib +import shutil import subprocess from pathlib import Path -from typing import BinaryIO +from typing import BinaryIO, Callable import pytest from core.db.models import Base, Extract, Image @@ -42,6 +43,41 @@ STUDY_DATE = datetime.date.fromisoformat("2023-01-01") +@pytest.fixture() +def resources() -> pathlib.Path: + """Top-level test resources directory path.""" + return pathlib.Path(__file__).parents[2] / "test" / "resources" + + +@pytest.fixture() +def omop_es_batch_generator(resources, tmp_path_factory) -> Callable[..., pathlib.Path]: + """ + return a callable which returns, by default, a path to (a copy of) the + resources/omop/batch_1/ directory, as if it were a single batch. + You can also set up any subset of the resources/omop/batch_* directories to be present + in the returned directory. Useful for testing different setups without having a load of + copied files in the resources/omop directory. + """ + omop_batch_root = resources / "omop" + # keep separate from a test that might want to use tmp_path + tmp = tmp_path_factory.mktemp("copied_omop_es_input") + + def inner_gen(batches=None, *, single_batch: bool = True) -> pathlib.Path: + if batches is None: + batches = ["batch_1"] + if single_batch: + assert len(batches) == 1 + # the root tmp dir will already exist; we are effectively replacing it + shutil.copytree(omop_batch_root / batches[0], tmp, dirs_exist_ok=True) + else: + assert batches + for b in batches: + shutil.copytree(omop_batch_root / b, tmp / b) + return tmp + + return inner_gen + + @pytest.fixture(scope="package") def run_containers() -> subprocess.CompletedProcess[bytes]: """Run docker containers for tests which require them.""" diff --git a/pixl_core/tests/test_upload.py b/pixl_core/tests/test_upload.py index ab1938ba3..44bf5de6d 100644 --- a/pixl_core/tests/test_upload.py +++ b/pixl_core/tests/test_upload.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. """Test functionality to upload files to an endpoint.""" - - +import filecmp import pathlib from datetime import datetime, timezone +import pandas as pd import pytest from core.db.models import Image from core.db.queries import get_project_slug_from_db, update_exported_at @@ -69,26 +69,40 @@ def test_update_exported_and_save(rows_in_session) -> None: @pytest.mark.usefixtures("run_containers") -def test_upload_parquet(parquet_export, mounted_data) -> None: - """Tests that parquet files are uploaded to the correct location""" - # ARRANGE - - parquet_export.copy_to_exports( - pathlib.Path(__file__).parents[2] / "test" / "resources" / "omop" / "batch_1", - pathlib.Path(), - ) - with (parquet_export.public_output.parent / "radiology.parquet").open("w") as handle: - handle.writelines(["dummy data"]) +def test_upload_parquet(omop_es_batch_generator, parquet_export, mounted_data) -> None: + """ + GIVEN an OMOP-ES extract of two batches has been added to the export dir + WHEN parquet files are uploaded via FTPS + THEN the structure of the batch should be preserved on the FTP server + """ + # ARRANGE - a ready to upload directory containing copied OMOP extract and radiology reports + batch_names = ["batch_1", "batch_2"] + omop_input_path = omop_es_batch_generator(batch_names, single_batch=False) + for batch in batch_names: + parquet_export.copy_to_exports( + omop_input_path / batch, + pathlib.Path(batch), + ) + parquet_export.export_radiology(pd.DataFrame(list("dummy"), columns=["D"])) # ACT upload_parquet_files(parquet_export) + # ASSERT expected_public_parquet_dir = ( mounted_data / parquet_export.project_slug / parquet_export.extract_time_slug / "parquet" ) assert expected_public_parquet_dir.exists() - assert (expected_public_parquet_dir / "PROCEDURE_OCCURRENCE.parquet").exists() - assert (expected_public_parquet_dir / "radiology.parquet").exists() + # Print difference report to aid debugging (it doesn't actually assert anything) + dc = filecmp.dircmp(parquet_export.current_extract_base, expected_public_parquet_dir) + dc.report_full_closure() + assert ( + expected_public_parquet_dir / "batch_1" / "public" / "PROCEDURE_OCCURRENCE.parquet" + ).exists() + assert ( + expected_public_parquet_dir / "batch_2" / "public" / "PROCEDURE_OCCURRENCE.parquet" + ).exists() + assert (expected_public_parquet_dir / "radiology" / "radiology.parquet").exists() @pytest.mark.usefixtures("run_containers") From 99c9d5840048c81e2673df79345cb94262675a4e Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Wed, 7 Feb 2024 16:52:56 +0000 Subject: [PATCH 17/19] Capture docker output for easier debugging --- pixl_core/tests/conftest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pixl_core/tests/conftest.py b/pixl_core/tests/conftest.py index 007a947e2..92b4c3b25 100644 --- a/pixl_core/tests/conftest.py +++ b/pixl_core/tests/conftest.py @@ -87,6 +87,7 @@ def run_containers() -> subprocess.CompletedProcess[bytes]: cwd=TEST_DIR, shell=True, # noqa: S602 timeout=60, + capture_output=True, ) subprocess.run( b"docker compose down --volumes", @@ -94,6 +95,7 @@ def run_containers() -> subprocess.CompletedProcess[bytes]: cwd=TEST_DIR, shell=True, # noqa: S602 timeout=60, + capture_output=True, ) From f871583cd851711ead131d7ca3b504ee6db0ddc7 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Wed, 7 Feb 2024 17:57:35 +0000 Subject: [PATCH 18/19] Missed a dependency --- pixl_core/pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pixl_core/pyproject.toml b/pixl_core/pyproject.toml index c0ed4dc48..4a0326765 100644 --- a/pixl_core/pyproject.toml +++ b/pixl_core/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "sqlalchemy==2.0.24", "psycopg2-binary==2.9.9", "pandas==1.5.1", + "pyarrow==14.0.1", ] [project.optional-dependencies] From 421f91f5f2f0fc66cee4d61828d34cb1fcd93855 Mon Sep 17 00:00:00 2001 From: Jeremy Stein Date: Thu, 8 Feb 2024 18:19:24 +0000 Subject: [PATCH 19/19] FTP upload everything as found. Document the files present at each stage of the pipeline - probably needs some changing to make more consistent! --- README.md | 39 ++++++++++++++++++++++++++++++++ pixl_core/src/core/upload.py | 41 +++++++++++++++++++++++++++------- pixl_core/tests/test_upload.py | 4 ++-- 3 files changed, 74 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 25933c463..663a4846b 100644 --- a/README.md +++ b/README.md @@ -248,3 +248,42 @@ PIXL data extracts include the below assumptions - (MRN, Accession number) is unique identifier for a report/DICOM study pair - Patients have a single _relevant_ MRN + + + +## File journey overview +Files that are present at each step of the pipeline. + +### Resources in source repo (for test only) +For building test versions of the OMOP ES extract dir. +``` +test/resources/omop/batch_1/... +....................batch_2/... +....................batch_3/public /*.parquet +............................private/*.parquet +............................extract_summary.json +``` + +### OMOP ES extract dir (input to PIXL) +As passed to CLI. +Square brackets (eg. `[batch_1]`) denote optional intermediate directory. +``` +EXTRACT_DIR/[batch_1]/... +.........../[batch_2]/public /*.parquet +......................private/*.parquet +......................extract_summary.json +``` + +### PIXL Export dir (PIXL intermediate) +``` +EXPORT_ROOT/PROJECT_SLUG/all_extracts/EXTRACT_DATETIME/radiology/radiology.parquet +....................................................../omop/public/[batch_1]/*.parquet +...................................................................[batch_2]/*.parquet +``` + +### FTP server +``` +FTPROOT/PROJECT_SLUG/EXTRACT_DATETIME/parquet/radiology/radiology.parquet +..............................................omop/public/[batch_1]/*.parquet +..........................................................[batch_2]/*.parquet +``` diff --git a/pixl_core/src/core/upload.py b/pixl_core/src/core/upload.py index fdcda10ac..3c60c9ad9 100644 --- a/pixl_core/src/core/upload.py +++ b/pixl_core/src/core/upload.py @@ -21,6 +21,7 @@ import ssl from datetime import datetime, timezone from ftplib import FTP_TLS +from pathlib import Path from typing import TYPE_CHECKING, Any, BinaryIO if TYPE_CHECKING: @@ -87,23 +88,34 @@ def upload_dicom_image(zip_content: BinaryIO, pseudo_anon_id: str) -> None: def upload_parquet_files(parquet_export: ParquetExport) -> None: """Upload parquet to FTPS under //parquet.""" - current_extract = parquet_export.public_output.parents[1] + source_root_dir = parquet_export.current_extract_base # Create the remote directory if it doesn't exist ftp = _connect_to_ftp() _create_and_set_as_cwd(ftp, parquet_export.project_slug) _create_and_set_as_cwd(ftp, parquet_export.extract_time_slug) _create_and_set_as_cwd(ftp, "parquet") - export_files = [x for x in current_extract.rglob("*.parquet") if x.is_file()] - if not export_files: - msg = f"No files found in {current_extract}" + # get the upload root directory before we do anything as we'll need + # to return to it (will it always be absolute?) + upload_root_dir = Path(ftp.pwd()) + if not upload_root_dir.is_absolute(): + logger.error("server remote path is not absolute, what are we going to do?") + + # absolute paths of the source + source_files = [x for x in source_root_dir.rglob("*.parquet") if x.is_file()] + if not source_files: + msg = f"No files found in {source_root_dir}" raise FileNotFoundError(msg) # throw exception if empty dir - for path in export_files: - with path.open("rb") as handle: - command = f"STOR {path.stem}.parquet" - logger.debug("Running %s", command) + for source_path in source_files: + _create_and_set_as_cwd(ftp, str(upload_root_dir)) + source_rel_path = source_path.relative_to(source_root_dir) + source_rel_dir = source_rel_path.parent + source_filename_only = source_rel_path.relative_to(source_rel_dir) + _create_and_set_as_cwd_multi_path(ftp, source_rel_dir) + with source_path.open("rb") as handle: + command = f"STOR {source_filename_only}" # Store the file using a binary handler ftp.storbinary(command, handle) @@ -132,6 +144,19 @@ def _connect_to_ftp() -> FTP_TLS: return ftp +def _create_and_set_as_cwd_multi_path(ftp: FTP_TLS, remote_multi_dir: Path) -> None: + """Create (and cwd into) a multi dir path, analogously to mkdir -p""" + if remote_multi_dir.is_absolute(): + # would require some special handling and we don't need it + err = "must be relative path" + raise ValueError(err) + logger.info("_create_and_set_as_cwd_multi_path %s", remote_multi_dir) + # path should be pretty normalised, so assume split is safe + sub_dirs = str(remote_multi_dir).split("/") + for sd in sub_dirs: + _create_and_set_as_cwd(ftp, sd) + + def _create_and_set_as_cwd(ftp: FTP_TLS, project_dir: str) -> None: try: ftp.cwd(project_dir) diff --git a/pixl_core/tests/test_upload.py b/pixl_core/tests/test_upload.py index 44bf5de6d..2a510097a 100644 --- a/pixl_core/tests/test_upload.py +++ b/pixl_core/tests/test_upload.py @@ -97,10 +97,10 @@ def test_upload_parquet(omop_es_batch_generator, parquet_export, mounted_data) - dc = filecmp.dircmp(parquet_export.current_extract_base, expected_public_parquet_dir) dc.report_full_closure() assert ( - expected_public_parquet_dir / "batch_1" / "public" / "PROCEDURE_OCCURRENCE.parquet" + expected_public_parquet_dir / "omop" / "public" / "batch_1" / "PROCEDURE_OCCURRENCE.parquet" ).exists() assert ( - expected_public_parquet_dir / "batch_2" / "public" / "PROCEDURE_OCCURRENCE.parquet" + expected_public_parquet_dir / "omop" / "public" / "batch_2" / "PROCEDURE_OCCURRENCE.parquet" ).exists() assert (expected_public_parquet_dir / "radiology" / "radiology.parquet").exists()