Skip to content

Commit

Permalink
v2.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
migraf committed Apr 3, 2023
2 parents b2906e9 + ebbaf43 commit a6d14a9
Show file tree
Hide file tree
Showing 8 changed files with 3,501 additions and 104 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ docs/_build/
$HOME
rabbitmq/
venv/
.env
.env
**.pem
**.key
station_data
24 changes: 6 additions & 18 deletions airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM ubuntu
FROM python:3.10-bullseye
MAINTAINER [email protected]

ENV DEBIAN_FRONTEND=noninteractive
Expand All @@ -16,31 +16,19 @@ RUN apt -y update && apt-get -y install software-properties-common && \
netcat \
locales \
curl && \
# install python
add-apt-repository ppa:deadsnakes/ppa && \
apt-get install -y python3.8 && apt install python-is-python3 && apt install -y python3-pip && \
# # install python
# apt install python-is-python3 && apt install -y python3-pip && \
rm -rf /var/lib/apt/lists


# Install container diff library
RUN curl -LO https://storage.googleapis.com/container-diff/latest/container-diff-linux-amd64 && \
install container-diff-linux-amd64 /usr/local/bin/container-diff

# install airflow
RUN pip install "apache-airflow[celery,crypto,hashicorp,password,postgres,redis]==2.2.4" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.4/constraints-3.8.txt"
RUN pip install "apache-airflow[celery,crypto,hashicorp,password,postgres,redis]==2.5.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.10.txt"
# install train container library
RUN pip install --force-reinstall --no-cache-dir -v "pht-train-container-library==2.0.2"

RUN mkdir /opt/train_data
COPY requirements.txt /tmp/requirements.txt


RUN pip install -r /tmp/requirements.txt


COPY airflow.cfg /root/airflow/airflow.cfg
COPY ./dags /root/airflow/dags
# install train container library

RUN pip install git+https://github.com/PHT-Medic/train-container-library.git

COPY entrypoint.sh /root/entrypoint.sh
RUN chmod +x /root/entrypoint.sh
Expand Down
141 changes: 66 additions & 75 deletions airflow/dags/DAG_PHT_run_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,35 @@
import sys
import os
import os.path
from pprint import pprint

import docker
import docker.types
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.utils.dates import days_ago

import docker
import docker.types
from docker.errors import APIError

from airflow.utils.dates import days_ago
from loguru import logger
import logging

from train_lib.docker_util.docker_ops import extract_train_config, extract_query_json
from train_lib.security.protocol import SecurityProtocol
from train_lib.clients import PHTFhirClient
from train_lib.docker_util.validate_master_image import validate_train_image
from train_lib.security.train_config import TrainConfig

# configure logging
class PropagateHandler(logging.Handler):
def emit(self, record):
logging.getLogger("airflow.task").handle(record)

logger.add(PropagateHandler(), format="{message}")
# task_logger = logging.getLogger("airflow.task")
# logger.add(task_logger.ha)


default_args = {
'owner': 'airflow',
'depends_on_past': False,
Expand All @@ -42,7 +55,7 @@
}


@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['pht', 'train'])
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['pht', 'train'], )
def run_pht_train():
@task()
def get_train_image_info():
Expand Down Expand Up @@ -107,27 +120,28 @@ def pull_docker_image(train_state):
return train_state

@task()
def extract_config_and_query(train_state):
def extract_config(train_state):
config = extract_train_config(train_state["img"])
train_state["config"] = config.dict(by_alias=True)

# try to extract th query json if it exists under the specified path
try:
query = json.loads(extract_query_json(train_state["img"]))
train_state["query"] = query
except Exception as e:
print(e)
train_state["query"] = None
print("No query file found ")

return train_state

# @task()
# def validate_against_master_image(train_state):
# master_image = train_state["config"]["master_image"]
# img = train_state["img"]
# validate_train_image(train_img=img, master_image=master_image)
# return train_state
@task()
def validate_master_image(train_state):
master_image_source = train_state["config"]["source"].get("address", None)
master_image_tag = train_state["config"]["source"].get("tag", "latest")

master_image = master_image_source + ":" + master_image_tag
print(f"Validating against master image: {master_image}")

client = docker.from_env()
client.images.pull(repository=master_image_source, tag=master_image_tag)
if not master_image_source:
raise ValueError("No master image source found in config")

img = train_state["img"]
validate_train_image(train_image_name=img, master_image_name=master_image)
return train_state

@task()
def pre_run_protocol(train_state):
Expand All @@ -140,9 +154,25 @@ def pre_run_protocol(train_state):

return train_state

@task()
def extract_query(train_state):
# try to extract th query json if it exists under the specified path
query = None
try:
query = json.loads(extract_query_json(train_state["img"]))
train_state["query"] = query
print("################### Query file was extracted ###################")
pprint(query)
except Exception as e:

train_state["query"] = None
print("################### Query file could not be extracted ###################")
print(query)
print(e)
return train_state

@task()
def execute_query(train_state):
print(train_state)
query = train_state.get("query", None)
if query:
print("Query found, setting up connection to FHIR server")
Expand Down Expand Up @@ -235,7 +265,7 @@ def execute_container(train_state):
network_disabled=True,
stderr=True,
stdout=True,
device_requests=[device_request] if device_request else []
device_requests=[device_request] if device_request else None
)
# If the container is already in use remove it
except APIError as e:
Expand All @@ -248,10 +278,11 @@ def execute_container(train_state):
print(f"logs_container_start({logs})logs_end")
exit_code = container_output["StatusCode"]


if exit_code != 0:
print(container_output)
raise ValueError(f"The train execution returned a non zero exit code: {exit_code}")

#
def _copy(from_cont, from_path, to_cont, to_path):
"""
Copies a file from one container to another container
Expand All @@ -267,13 +298,22 @@ def _copy(from_cont, from_path, to_cont, to_path):
base_image = ':'.join([train_state["repository"], 'base'])
to_container = client.containers.create(base_image)
# Copy results to base image
_copy(from_cont=container,
from_path="/opt/pht_train",
to_cont=to_container,
to_path="/opt/pht_train")
_copy(from_cont=container,
from_path="/opt/pht_results",
to_cont=to_container,
to_path="/opt/pht_results")
_copy(from_cont=container,
from_path="/opt/train_config.json",
to_cont=to_container,
to_path="/opt/train_config.json")

to_container.commit(repository=train_state["repository"], tag=train_state["tag"])
container.remove(v=True, force=True)
to_container.remove(v=True, force=True)
if exit_code != 0:
raise ValueError(f"The train execution returned a non zero exit code: {exit_code}")

Expand All @@ -292,55 +332,6 @@ def post_run_protocol(train_state):

return train_state

@task()
def rebase(train_state):
base_image = ':'.join([train_state["repository"], 'base'])
client = docker.from_env(timeout=120)
to_container = client.containers.create(base_image)
updated_tag = train_state["tag"]

def _copy(from_cont, from_path, to_cont, to_path):
"""
Copies a file from one container to another container
:param from_cont:
:param from_path:
:param to_cont:
:param to_path:
:return:
"""
tar_stream, _ = from_cont.get_archive(from_path)
to_cont.put_archive(os.path.dirname(to_path), tar_stream)

from_container = client.containers.create(train_state["img"])

# Copy results to base image
_copy(from_cont=from_container,
from_path="/opt/pht_results",
to_cont=to_container,
to_path="/opt/pht_results")

# Hardcoded copying of train_config.json
_copy(from_cont=from_container,
from_path="/opt/train_config.json",
to_cont=to_container,
to_path="/opt/train_config.json")

print('Copied files into baseimage')

print(f'Creating image: {train_state["repository"]}:{updated_tag}')
print(type(to_container))
# Rebase the train
try:
img = to_container.commit(repository=train_state["repository"], tag=train_state["tag"])
# remove executed containers -> only images needed from this point
print('Removing containers')
to_container.remove()
from_container.remove()
return train_state
except Exception as err:
print(err)
sys.exit()

@task()
def push_train_image(train_state):
client = docker.from_env()
Expand All @@ -361,13 +352,13 @@ def push_train_image(train_state):

train_state = get_train_image_info()
train_state = pull_docker_image(train_state)
train_state = extract_config_and_query(train_state)
# train_state = validate_against_master_image(train_state)
train_state = extract_config(train_state)
train_state = validate_master_image(train_state)
train_state = pre_run_protocol(train_state)
train_state = extract_query(train_state)
train_state = execute_query(train_state)
train_state = execute_container(train_state)
train_state = post_run_protocol(train_state)
train_state = rebase(train_state)
push_train_image(train_state)


Expand Down
4 changes: 2 additions & 2 deletions airflow/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set -e

if [ "$1" = 'webserver' ]; then
airflow db init
airflow users create --username $AIRFLOW_USER --firstname Station --lastname Admin --role Admin --email [email protected] -p $AIRFLOW_PW
airflow users create --username $AIRFLOW_USER --firstname Station --lastname Admin --role Admin --email [email protected] --password $AIRFLOW_PW
airflow scheduler &
exec airflow webserver

Expand All @@ -13,7 +13,7 @@ elif [ "$1" = 'worker' ]; then

elif [ "$1" = 'init' ]; then
airflow db init
airflow users create --username $AIRFLOW_USER --firstname FIRST_NAME --lastname LAST_NAME --role Admin --email [email protected] -p $AIRFLOW_PW
airflow users create --username $AIRFLOW_USER --firstname FIRST_NAME --lastname LAST_NAME --role Admin --email [email protected] --password $AIRFLOW_PW
# airflow connections add 'station_db' --conn-type 'postgres' --conn-login 'admin' --conn-password 'admin' --conn-host 'postgres' --conn-port '5432' --conn-schema "pht_station_${STATION_ID}"
# exec station_airflow worker
else
Expand Down
8 changes: 0 additions & 8 deletions airflow/requirements.txt

This file was deleted.

1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ services:
# Station Private Key environment variable needs to be set and registered
- $STATION_PRIVATE_KEY_PATH:/opt/private_key.pem
- $STATION_DATA_DIR:$AIRFLOW_DATA_DIR
restart: unless-stopped
depends_on:
- pg_station
environment:
Expand Down
Loading

0 comments on commit a6d14a9

Please sign in to comment.