Skip to content

Commit

Permalink
Add FTPS server to system test and test successful upload (#268)
Browse files Browse the repository at this point in the history
* Update `pixl_ehr` README, not actually using cogstack Docker service

* Move cogstack dummy service to subdirectory

* Move ftp-server to `test/dummy-services`

* Add ftp-server to system-test docker compose

* Add `SendViaFTPS` to `OnChange` callback for `orthanc-anon`

* Check FTPS upload in system test

* Fix `check_ftps_upload` and log expected export files

* Trigger system test on CI

* Typo fix

Co-authored-by: Peter Tsrunchev <[email protected]>

* Rename endpoint uploading env variable and set default to `false`

* Remove type annotations for `OnChange` again

* Fix `glob_list` creation

Co-authored-by: Jeremy Stein <[email protected]>

* Make container naming consistent

* Rename mount data env variable and expand it

* Print uploaded files instead of logging

* Hardcode project name slug

* Disable Azure related code in `orthanc-anon` plugin if no Azure available

* fix: get the correct pseudo-anon ID before sending via FTPS

* Add missing env variables

* fix: check for env var correctly

* More bug fixes

* More consistent naming of env variable

* Rename query helper function

* Make helper functions snakecase

* Also update the `.env` files

* Make orthanc plugin code more type safe and system exit when query fails

* More type safety

* Define the `FTP_PORT` environment variable

* Document the FTPS server requirements

Should have done this way earlier!

* Markdown formatting

* Fix FTP config

* Add more FTP logging

* Allow python 11 for CLI

* Ignore new data mount location

* Allow system tests to pass

* DEBUG: check ftp logs

* Poll for two minutes instead of hardcoded wait

* Poll for two minutes instead of hardcoded wait

* More debugging

* Reorder system test scripts

* For arguments sake, try polling for images for 4 minutes

* Increase timeout

* Use logger rather than logging directly

* Increase timeout

* Add comment about number of expected studies

* Clean up ftp output directory after check

* Actually wait for the desired number of seconds

* Empty-Commit to test building

* Fix CI for ftps upload

* Remove logs check

* Reorder system tests to make parquet upload export testing easier

* Empty-Commit to test building

* Wait until there are two exported DICOM messages

In case we hit CI when there is only one

* Document FTPS server specifics

* Try to make ftp server more robust in CI

* Copy certs into dockerfile instead of mounting

Could this fix our volume error? Who knows

* Copy certs into dockerfile instead of mounting

Could this fix our volume error? Who knows

* Update core test compose file too

* Put "when" and "then" statements on their own lines

* docs: SSL certificates are now copied instead of mounted

---------

Co-authored-by: Peter Tsrunchev <[email protected]>
Co-authored-by: Jeremy Stein <[email protected]>
Co-authored-by: Stef Piatek <[email protected]>
  • Loading branch information
4 people authored Feb 2, 2024
1 parent 82e4738 commit 428c3a5
Show file tree
Hide file tree
Showing 25 changed files with 364 additions and 144 deletions.
12 changes: 9 additions & 3 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ ORTHANC_RAW_MAXIMUM_STORAGE_SIZE= // MB

# PIXL Orthanc anon instance
ORTHANC_ANON_USERNAME=
ORTHANC_ANON_PASSWORD=
ORTHANC_ANON_PASSWORD=
ORTHANC_ANON_AE_TITLE=
ORTHANC_ANON_HTTP_TIMEOUT=60
ORTHANC_AUTOROUTE_ANON_TO_AZURE=false
ORTHANC_AUTOROUTE_ANON_TO_ENDPOINT=false
ENABLE_DICOM_WEB=true
STUDY_TIME_OFFSET=
SALT_VALUE=PIXL


# UCVNAQR DICOM node information
# UCVNAQR DICOM node information
VNAQR_AE_TITLE=
VNAQR_DICOM_PORT=
VNAQR_IP_ADDR=
Expand Down Expand Up @@ -83,3 +83,9 @@ RABBITMQ_PASSWORD=
# PACS extraction API
PIXL_DICOM_TRANSFER_TIMEOUT=240
PIXL_QUERY_TIMEOUT=10

# FTP server
FTP_HOST=
FTP_USER_NAME=
FTP_USER_PASSWORD=
FTP_PORT=
5 changes: 5 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ jobs:
timeout-minutes: 30
steps:
- uses: actions/checkout@v3
- name: Prune docker volumes
# seems like we're getting an error from ftp-server exiting with zero status code
# this is a workaround that resolves it locally 🤞
run: |
docker volume prune --force
- uses: docker/setup-buildx-action@v3
# pre-build and cache the postgres container as installing python3 takes a while, doesn't push
Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,4 @@ dmypy.json
# project specific files
/exports/
**/test_producer.csv
pixl_core/tests/ftp-server/mounts/data/*
/test/dummy-services/ftp-server/mounts/data/*
2 changes: 1 addition & 1 deletion cli/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = [
]
description = "PIXL command line interface"
readme = "README.md"
requires-python = "<3.11"
requires-python = "<3.12"
classifiers = [
"Programming Language :: Python :: 3"
]
Expand Down
12 changes: 9 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ x-pixl-db: &pixl-db
PIXL_DB_PASSWORD: ${PIXL_DB_PASSWORD}
PIXL_DB_NAME: ${PIXL_DB_NAME}

x-ftp-host: &ftp-host
FTP_HOST: ${FTP_HOST}
FTP_USER_NAME: ${FTP_USER_NAME}
FTP_USER_PASSWORD: ${FTP_USER_PASSWORD}
FTP_PORT: ${FTP_PORT}

x-logs-volume: &logs-volume
type: volume
source: logs
Expand Down Expand Up @@ -110,12 +116,12 @@ services:
<<: *build-args-common
command: /run/secrets
environment:
<<: [*proxy-common, *pixl-common-env]
<<: [*proxy-common, *pixl-common-env, *ftp-host]
ORTHANC_NAME: "PIXL: Anon"
ORTHANC_USERNAME: ${ORTHANC_ANON_USERNAME}
ORTHANC_PASSWORD: ${ORTHANC_ANON_PASSWORD}
ORTHANC_ANON_AE_TITLE: ${ORTHANC_ANON_AE_TITLE}
ORTHANC_AUTOROUTE_ANON_TO_AZURE: ${ORTHANC_AUTOROUTE_ANON_TO_AZURE}
ORTHANC_AUTOROUTE_ANON_TO_ENDPOINT: ${ORTHANC_AUTOROUTE_ANON_TO_ENDPOINT}
ORTHANC_RAW_AE_TITLE: ${ORTHANC_RAW_AE_TITLE}
ORTHANC_RAW_DICOM_PORT: "4242"
ORTHANC_RAW_HOSTNAME: "orthanc-raw"
Expand Down Expand Up @@ -223,7 +229,7 @@ services:
args:
<<: *build-args-common
environment:
<<: [*pixl-db, *emap-db, *proxy-common, *pixl-common-env, *pixl-rabbit-mq]
<<: [*pixl-db, *emap-db, *proxy-common, *pixl-common-env, *pixl-rabbit-mq, *ftp-host]
AZURE_CLIENT_ID: ${PIXL_EHR_API_AZ_CLIENT_ID}
AZURE_CLIENT_SECRET: ${PIXL_EHR_API_AZ_CLIENT_SECRET}
AZURE_TENANT_ID: ${PIXL_EHR_API_AZ_TENANT_ID}
Expand Down
26 changes: 26 additions & 0 deletions docs/services/ftp-server.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# FTPS server

Currently, we can only upload files to the Data Safe Haven (DSH) through an
[FTPS](https://en.wikipedia.org/wiki/FTPS) connection.

The [`core.upload`](../../pixl_core/src/core/upload.py) module implements functionality to upload
DICOM tags and parquet files to an **FTPS server**. This requires the following environment
variables to be set:

- `FTP_HOST`: URL to the FTPS server
- `FTP_PORT`: port on which the FTPS server is listening
- `FTP_USER_NAME`: name of user with access to the FTPS server
- `FTP_USER_PASSWORD`: password for the authorised user

We provide mock values for these for the unit tests (see
[`./tests/conftest.py`](./tests/conftest.py)). When running in production, these should be defined
in the `.env` file (see [the example](../.env.sample)).

For the `pixl_core` unit tests and the system test, we spin up an FTPS server with a Docker
container, defined in [`test/dummy-services/ftp-server`](../../test/dummy-services/ftp-server/) and
set the necessary environment variables in [`.env.test`](../../test/.env.test).

## FTPS test server

We provide a Docker container to spin up a test FTPS server. The documentation for this can be found
in [`test/README.md`](../../test/README.md).
85 changes: 59 additions & 26 deletions orthanc/orthanc-anon/plugin/pixl.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from io import BytesIO
from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING

import requests
import yaml
Expand All @@ -38,14 +39,18 @@
import orthanc
import pixl_dcmd

if TYPE_CHECKING:
from typing import Any

ORTHANC_USERNAME = config("ORTHANC_USERNAME")
ORTHANC_PASSWORD = config("ORTHANC_PASSWORD")
ORTHANC_URL = "http://localhost:8042"

logger = logging.getLogger(__name__)


def AzureAccessToken():
"""
Send payload to oath2/token url and
return the response
"""
Expand All @@ -69,7 +74,6 @@ def AzureAccessToken():

def AzureDICOMTokenRefresh():
"""
Refresh Azure DICOM token
If this fails then wait 30s and try again
If successful then access_token can be used in
Expand Down Expand Up @@ -141,7 +145,7 @@ def SendViaStow(resourceId):

payload = {"Resources": [resourceId], "Synchronous": False}

logging.info("Payload: %s", payload)
logger.info("Payload: %s", payload)

try:
requests.post(
Expand All @@ -160,51 +164,80 @@ def SendViaFTPS(resourceId: str) -> None:
Makes a POST API call to upload the resource to an FTPS server
using orthanc credentials as authorisation
"""
# Query orthanc-anon for the study
msg = f"Sending {resourceId} via FTPS"
logging.info(msg)
# Download zip archive of the DICOM resource
query = f"{ORTHANC_URL}/studies/{resourceId}/archive"
try:
response_study = requests.get(query, auth=(ORTHANC_USERNAME, ORTHANC_PASSWORD), timeout=10)
success_code = 200
if response_study.status_code != success_code:
msg = "Could not download archive of resource '%s'"
raise RuntimeError(msg, resourceId)
except requests.exceptions.RequestException:
orthanc.LogError(f"Failed to query'{resourceId}'")
fail_msg = "Could not download archive of resource '%s'"
response_study = _query(resourceId, query, fail_msg)

# get the zip content
zip_content = response_study.content
logging.info("Downloaded data for resource %s", resourceId)
logger.info("Downloaded data for resource %s", resourceId)

upload.upload_dicom_image(zip_content, resourceId)
upload.upload_dicom_image(BytesIO(zip_content), _get_patient_id(resourceId))
logger.info("Uploaded data to FTPS for resource %s", resourceId)


def ShouldAutoRoute():
def _get_patient_id(resourceId: str) -> str:
"""
Queries the Orthanc instance to get the PatientID for a given resource.
When anonymisation has been applied, the PatientID is the pseudo-anonymised ID.
"""
Checks whether ORTHANC_AUTOROUTE_ANON_TO_AZURE environment variable is
query = f"{ORTHANC_URL}/studies/{resourceId}"
fail_msg = "Could not query study for resource '%s'"

response_study = _query(resourceId, query, fail_msg)
json_response = json.loads(response_study.content.decode())
return str(json_response["PatientMainDicomTags"]["PatientID"])


def _query(resourceId: str, query: str, fail_msg: str) -> requests.Response:
try:
response = requests.get(query, auth=(ORTHANC_USERNAME, ORTHANC_PASSWORD), timeout=10)
success_code = 200
if response.status_code != success_code:
raise RuntimeError(fail_msg, resourceId)
except requests.exceptions.RequestException as request_exception:
orthanc.LogError(f"Failed to query'{resourceId}'")
raise SystemExit from request_exception
else:
return response


def ShouldAutoRoute() -> bool:
"""
Checks whether ORTHANC_AUTOROUTE_ANON_TO_ENDPOINT environment variable is
set to true or false
"""
return os.environ.get("ORTHANC_AUTOROUTE_ANON_TO_AZURE", "false").lower() == "true"
logger.debug("Checking value of autoroute")
return os.environ.get("ORTHANC_AUTOROUTE_ANON_TO_ENDPOINT", "false").lower() == "true"


def _azure_available() -> bool:
# Check if AZ_DICOM_ENDPOINT_CLIENT_ID is set
return config("AZ_DICOM_ENDPOINT_CLIENT_ID", default="") != ""

def OnChange(changeType, level, resource) -> None: # noqa: ARG001

def OnChange(changeType, level, resource): # noqa: ARG001
"""
Three ChangeTypes included in this function:
- If a study if stable and if ShouldAutoRoute returns true
then SendViaStow is called
- If a study is stable and if ShouldAutoRoute returns true
then SendViaFTPS is called
- If orthanc has started then message added to Orthanc LogWarning
and AzureDICOMTokenRefresh called
- If orthanc has stopped and TIMER is not none then message added
to Orthanc LogWarning and TIMER cancelled
"""
if not ShouldAutoRoute():
return

if changeType == orthanc.ChangeType.STABLE_STUDY and ShouldAutoRoute():
print("Stable study: %s" % resource) # noqa: T201
SendViaStow(resource)
msg = f"Stable study: {resource}"
logger.info(msg)
SendViaFTPS(resource)

if changeType == orthanc.ChangeType.ORTHANC_STARTED:
if changeType == orthanc.ChangeType.ORTHANC_STARTED and _azure_available():
orthanc.LogWarning("Starting the scheduler")
AzureDICOMTokenRefresh()
elif changeType == orthanc.ChangeType.ORTHANC_STOPPED:
Expand All @@ -213,13 +246,13 @@ def OnChange(changeType, level, resource) -> None: # noqa: ARG001
TIMER.cancel()


def OnHeartBeat(output, uri, **request): # noqa: ARG001
def OnHeartBeat(output, uri, **request) -> Any: # noqa: ARG001
"""Extends the REST API by registering a new route in the REST API"""
orthanc.LogWarning("OK")
output.AnswerBuffer("OK\n", "text/plain")


def ReceivedInstanceCallback(receivedDicom, origin):
def ReceivedInstanceCallback(receivedDicom: bytes, origin: str) -> Any:
"""Modifies a DICOM instance received by Orthanc and applies anonymisation."""
if origin == orthanc.InstanceOrigin.REST_API:
orthanc.LogWarning("DICOM instance received from the REST API")
Expand Down
42 changes: 29 additions & 13 deletions pixl_core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ upstream services.

Specifically, it defines:

- The [Token buffer](#token-buffer) for rate limiting requests to the upstream services
- The [RabbitMQ queue](#patient-queue) implementation shared by the EHR and Imaging APIs
- The PIXL `postgres` internal database for storing exported images and extracts from the messages
processed by the CLI driver
- The [`ParquetExport`](./src/core/exports.py) class for exporting OMOP and EMAP extracts to parquet files
- Handling of [uploads over FTPS](./src/core/upload.py), used to transfer images and parquet files
to the DSH (Data Safe Haven)
- The [Token buffer](#token-buffer) for rate limiting requests to the upstream services
- The [RabbitMQ queue](#patient-queue) implementation shared by the EHR and Imaging APIs
- The PIXL `postgres` internal database for storing exported images and extracts from the messages
processed by the CLI driver
- The [`ParquetExport`](./src/core/exports.py) class for exporting OMOP and EMAP extracts to
parquet files
- Handling of [uploads over FTPS](./src/core/upload.py), used to transfer images and parquet files
to the DSH (Data Safe Haven)

## Installation

Expand All @@ -33,9 +34,9 @@ The token buffer is needed to limit the download rate for images from PAX/VNA. C
suggests that a rate limit of five images per second should be sufficient, however that may have to
be altered dynamically through command line interaction.

The current implementation of the token buffer uses the [token bucket implementation from
Falconry](https://github.com/falconry/token-bucket/). Furthermore, the token buffer is not set up as
a service as it is only needed for the image download rate.
The current implementation of the token buffer uses the
[token bucket implementation from Falconry](https://github.com/falconry/token-bucket/). Furthermore,
the token buffer is not set up as a service as it is only needed for the image download rate.

## Patient queue

Expand All @@ -45,7 +46,8 @@ different PIXL services. Currently, we define two queues:
1. `pacs` for downloading and de-identifying images
2. `ehr` for downloading and de-identifying the EHR data

The image anonymisation will be triggered automatically once the image has been downloaded to the raw Orthanc server.
The image anonymisation will be triggered automatically once the image has been downloaded to the
raw Orthanc server.

### RabbitMQ

Expand All @@ -54,8 +56,8 @@ RabbitMQ is used for the queue implementation.
The client of choice for RabbitMQ at this point in time is
[pika](https://pika.readthedocs.io/en/stable/), which provides both a synchronous and asynchronous
way of transferring messages. The former is geared towards high data throughput whereas the latter
is geared towards stability. The asynchronous mode of transferring messages is a lot more complex
as it is based on the [asyncio event loop](https://docs.python.org/3/library/asyncio-eventloop.html).
is geared towards stability. The asynchronous mode of transferring messages is a lot more complex as
it is based on the [asyncio event loop](https://docs.python.org/3/library/asyncio-eventloop.html).

### OMOP ES files

Expand Down Expand Up @@ -85,3 +87,17 @@ for convenience `latest` is a symlink to the most recent extract.
│ └── public
└── latest -> all_extracts/2023-12-13t16-22-40
```

## Uploading to an FTPS server

The `core.upload` module implements functionality to upload DICOM tags and parquet files to an
**FTPS server**. This requires the following environment variables to be set:

- `FTP_HOST`: URL to the FTPS server
- `FTP_PORT`: port on which the FTPS server is listening
- `FTP_USER_NAME`: name of user with access to the FTPS server
- `FTP_USER_PASSWORD`: password for the authorised user

We provide mock values for these for the unit tests (see
[`./tests/conftest.py`](./tests/conftest.py)). When running in production, these should be defined
in the `.env` file (see [the example](../.env.sample)).
21 changes: 15 additions & 6 deletions pixl_core/src/core/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ def upload_dicom_image(zip_content: BinaryIO, pseudo_anon_id: str) -> None:
logger.debug("Running %s", command)

# Store the file using a binary handler
ftp.storbinary(command, zip_content)
try:
ftp.storbinary(command, zip_content)
except ftplib.all_errors as ftp_error:
ftp.quit()
error_msg = "Failed to run STOR command '%s': '%s'"
raise ConnectionError(error_msg, command, ftp_error) from ftp_error

# Close the FTP connection
ftp.quit()
Expand All @@ -82,13 +87,17 @@ def _connect_to_ftp() -> FTP_TLS:
ftp_host = os.environ["FTP_HOST"]
ftp_port = os.environ["FTP_PORT"] # FTPS usually uses port 21
ftp_user = os.environ["FTP_USER_NAME"]
ftp_password = os.environ["FTP_USER_PASS"]
ftp_password = os.environ["FTP_USER_PASSWORD"]

# Connect to the server and login
ftp = ImplicitFtpTls()
ftp.connect(ftp_host, int(ftp_port))
ftp.login(ftp_user, ftp_password)
ftp.prot_p()
try:
ftp = ImplicitFtpTls()
ftp.connect(ftp_host, int(ftp_port))
ftp.login(ftp_user, ftp_password)
ftp.prot_p()
except ftplib.all_errors as ftp_error:
error_msg = "Failed to connect to FTPS server: '%s'"
raise ConnectionError(error_msg, ftp_error) from ftp_error
return ftp


Expand Down
Loading

0 comments on commit 428c3a5

Please sign in to comment.