Skip to content

Commit

Permalink
Merge pull request #24 from Ferlab-Ste-Justine/cqdg-642
Browse files Browse the repository at this point in the history
feat: CQDG-642 Use docker image from variables
  • Loading branch information
jecos authored Mar 12, 2024
2 parents f0f96fc + 771964f commit 051f927
Show file tree
Hide file tree
Showing 34 changed files with 1,201 additions and 1,307 deletions.
41 changes: 27 additions & 14 deletions dags/arranger.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,36 @@
from airflow import DAG
from airflow.models import Variable
from datetime import datetime
from lib.config import env, Env, K8sContext
from lib.operators.arranger import ArrangerOperator
from lib.config import keycloak_client_secret_name, env, Env, kube_config, es_url, es_port, es_credentials_secret_name,es_credentials_secret_key_password, es_credentials_secret_key_username
from lib.operators.arranger import ArrangerConfig

arranger_config = ArrangerConfig(
es_url = es_url,
node_environment= 'production' if env == Env.PROD else env,
kube_config = kube_config,
image = Variable.get('arranger_image'),
es_port = es_port,
es_cert_secret_name = 'opensearch-ca-certificate',
es_credentials_secret_name = es_credentials_secret_name,
es_credentials_secret_key_username = es_credentials_secret_key_username,
es_credentials_secret_key_password = es_credentials_secret_key_password,
keycloak_client_secret_name = keycloak_client_secret_name,
)

def arranger_task():
return arranger_config.args(
'--experimental-modules=node',
'--es-module-specifier-resolution=node',
'admin/run.mjs') \
.operator(
task_id='arranger_update_project',
name='etl-publish-arranger-update-project',
)
with DAG(
dag_id='update_arranger_project',
dag_id='update-arranger-project',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={},
) as dag:
arranger_task()


arranger_update_project = ArrangerOperator(
task_id='arranger_update_project',
name='etl-publish-arranger-update-project',
k8s_context=K8sContext.DEFAULT,
cmds=['node',
'--experimental-modules=node',
'--es-module-specifier-resolution=node',
'admin/run.mjs',
],
)
32 changes: 17 additions & 15 deletions dags/create_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airflow.exceptions import ParamValidationError
from airflow.models import Param, TaskInstance, DagRun

from lib.config import default_config_file, default_params, K8sContext, variant_task_jar
from lib.config import spark_small_conf, default_config_file, default_params, variant_jar, etl_base_config
from lib.operators.spark import SparkOperator

# Update default params
Expand All @@ -21,11 +21,12 @@
})

with DAG(
dag_id='create_tables',
dag_id='create-tables',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params=params,
) as dag:

@task(task_id='get_dataset_ids')
def get_dataset_ids(**kwargs) -> List[str]:
ti: TaskInstance = kwargs['ti']
Expand All @@ -41,7 +42,7 @@ def get_dataset_ids(**kwargs) -> List[str]:


class CreateTableAndView(SparkOperator):
template_fields = SparkOperator.template_fields + ('arguments', 'dataset_ids',)
template_fields = [*SparkOperator.template_fields,'arguments', 'dataset_ids']

def __init__(self,
dataset_ids,
Expand All @@ -55,15 +56,16 @@ def execute(self, **kwargs):
self.arguments = self.arguments + self.dataset_ids
super().execute(**kwargs)


CreateTableAndView(task_id='create_table_and_view',
name='create-table-and-view',
k8s_context=K8sContext.DEFAULT,
spark_jar=variant_task_jar,
spark_class='bio.ferlab.datalake.spark3.hive.CreateTableAndView',
spark_config='etl-task-small',
arguments=['--config', default_config_file,
'--steps', 'default',
'--app-name', 'create_table_and_view',
],
dataset_ids=get_dataset_ids())
etl_base_config.add_spark_conf(spark_small_conf) \
.with_spark_class('bio.ferlab.datalake.spark3.hive.CreateTableAndView') \
.with_spark_jar(variant_jar) \
.args('--config', default_config_file,
'--steps', 'default',
'--app-name', 'create_table_and_view'
).operator(
class_to_instantiate=CreateTableAndView,
task_id = 'create_table_and_view',
name = 'create-table-and-view',
dataset_ids=get_dataset_ids()
)

78 changes: 39 additions & 39 deletions dags/es_templates_update.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s

from lib import config
from lib.config import env
from lib.config import datalake_bucket, kube_config, aws_endpoint, aws_secret_name, aws_secret_access_key, aws_secret_secret_key

script = f"""
#!/bin/bash
Expand Down Expand Up @@ -33,46 +32,47 @@
curl https://raw.githubusercontent.com/Ferlab-Ste-Justine/etl-cqdg-portal/master/index-task/src/main/resources/templates/template_gene_suggestions.json --output ./templates/template_gene_suggestions.json
echo Copy templates ...
mc cp ./templates/template_study_centric.json myminio/cqdg-{env}-app-datalake/templates/template_study_centric.json
mc cp ./templates/template_file_centric.json myminio/cqdg-{env}-app-datalake/templates/template_file_centric.json
mc cp ./templates/template_participant_centric.json myminio/cqdg-{env}-app-datalake/templates/template_participant_centric.json
mc cp ./templates/template_biospecimen_centric.json myminio/cqdg-{env}-app-datalake/templates/template_biospecimen_centric.json
mc cp ./templates/template_study_centric.json myminio/{datalake_bucket}/templates/template_study_centric.json
mc cp ./templates/template_file_centric.json myminio/{datalake_bucket}/templates/template_file_centric.json
mc cp ./templates/template_participant_centric.json myminio/{datalake_bucket}/templates/template_participant_centric.json
mc cp ./templates/template_biospecimen_centric.json myminio/{datalake_bucket}/templates/template_biospecimen_centric.json
mc cp ./templates/template_variant_centric.json myminio/cqdg-{env}-app-datalake/templates/template_variant_centric.json
mc cp ./templates/template_gene_centric.json myminio/cqdg-{env}-app-datalake/templates/template_gene_centric.json
mc cp ./templates/template_variant_suggestions.json myminio/cqdg-{env}-app-datalake/templates/template_variant_suggestions.json
mc cp ./templates/template_gene_suggestions.json myminio/cqdg-{env}-app-datalake/templates/template_gene_suggestions.json
mc cp ./templates/template_variant_centric.json myminio/{datalake_bucket}/templates/template_variant_centric.json
mc cp ./templates/template_gene_centric.json myminio/{datalake_bucket}/templates/template_gene_centric.json
mc cp ./templates/template_variant_suggestions.json myminio/{datalake_bucket}/templates/template_variant_suggestions.json
mc cp ./templates/template_gene_suggestions.json myminio/{datalake_bucket}/templates/template_gene_suggestions.json
"""

es_templates_update = KubernetesPodOperator(
task_id='es_templates_update',
name='es-templates-update',
image="alpine:3.14",
is_delete_operator_pod=True,
cmds=["sh", "-cx"],
arguments=[script],
namespace=config.k8s_namespace,
env_vars=[
k8s.V1EnvVar(
name='AWS_ENDPOINT',
value='https://objets.juno.calculquebec.ca',
),
k8s.V1EnvVar(
name='AWS_ACCESS_KEY_ID',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name='ceph-s3-credentials',
key='access',
),
def es_templates_update():
return KubernetesPodOperator(
task_id='es_templates_update',
name='es-templates-update',
image="alpine:3.14",
is_delete_operator_pod=True,
cmds=["sh", "-cx"],
arguments=[script],
namespace=kube_config.namespace,
env_vars=[
k8s.V1EnvVar(
name='AWS_ENDPOINT',
value=aws_endpoint,
),
),
k8s.V1EnvVar(
name='AWS_SECRET_ACCESS_KEY',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name='ceph-s3-credentials',
key='secret',
k8s.V1EnvVar(
name='AWS_ACCESS_KEY_ID',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name=aws_secret_name,
key=aws_secret_access_key,
),
),
),
), ]
)
k8s.V1EnvVar(
name='AWS_SECRET_ACCESS_KEY',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name=aws_secret_name,
key=aws_secret_secret_key,
),
),
), ]
)
164 changes: 18 additions & 146 deletions dags/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
from airflow.models.param import Param
from airflow.utils.task_group import TaskGroup
from datetime import datetime
from lib import config
from lib.operators.fhavro import FhavroOperator
from lib.config import env, K8sContext
from lib.operators.spark import SparkOperator
from lib.operators.arranger import ArrangerOperator
from lib.config import env, study_ids, release_id, es_port, es_url, etl_publish_config, project
from es_templates_update import es_templates_update
from etl_import import etl_import
from arranger import arranger_task
from etl_fhavro_export import fhavro_export
from etl_index import index_operator
from etl_prepare_index import prepare_index

with DAG(
dag_id='etl',
Expand All @@ -20,148 +21,19 @@
'es_port': Param('9200', type='string')
},
) as dag:
def release_id() -> str:
return '{{ params.release_id }}'


def study_ids() -> str:
return '{{ params.study_ids }}'


def project() -> str:
return '{{ params.project }}'


def es_port() -> str:
return '{{ params.es_port }}'


fhavro_export = FhavroOperator(
task_id='fhavro_export',
name='etl-fhavro_export',
k8s_context=K8sContext.DEFAULT,
cmds=['java',
'-cp',
'fhavro-export.jar',
'bio/ferlab/fhir/etl/FhavroExport',
release_id(), study_ids(), env
],
)

import_task = SparkOperator(
task_id='import_task',
name='etl-import-task',
k8s_context=K8sContext.DEFAULT,
spark_jar=config.spark_import_jar,
spark_class='bio.ferlab.fhir.etl.ImportTask',
spark_config='etl-task-small',
arguments=[f'config/{env}-{project()}.conf', 'default', release_id(), study_ids()],
)

prepare_index = SparkOperator(
task_id='prepare_index',
name='etl-prepare-index',
k8s_context=K8sContext.DEFAULT,
spark_jar=config.spark_prepare_index_jar,
spark_class='bio.ferlab.fhir.etl.PrepareIndex',
spark_config='etl-task-small',
arguments=[f'config/{env}-{project()}.conf', 'default', 'all', study_ids()],
)

with TaskGroup(group_id='index') as index:
study_centric = SparkOperator(
task_id='study_centric',
name='etl-index-study-centric',
k8s_context=K8sContext.DEFAULT,
spark_jar=config.spark_index_jar,
spark_class='bio.ferlab.fhir.etl.IndexTask',
spark_config='etl-task-small',
arguments=[release_id(), study_ids(), 'study_centric', env, project(), config.es_url, es_port()],
)

participant_centric = SparkOperator(
task_id='participant_centric',
name='etl-index-participant-centric',
k8s_context=K8sContext.DEFAULT,
spark_jar=config.spark_index_jar,
spark_class='bio.ferlab.fhir.etl.IndexTask',
spark_config='etl-task-small',
arguments=[release_id(), study_ids(), 'participant_centric', env, project(), config.es_url, es_port()],
)

file_centric = SparkOperator(
task_id='file_centric',
name='etl-index-file-centric',
k8s_context=K8sContext.DEFAULT,
spark_jar=config.spark_index_jar,
spark_class='bio.ferlab.fhir.etl.IndexTask',
spark_config='etl-task-small',
arguments=[release_id(), study_ids(), 'file_centric', env, project(), config.es_url, es_port()],
)

biospecimen_centric = SparkOperator(
task_id='biospecimen_centric',
name='etl-index-biospecimen-centric',
k8s_context=K8sContext.DEFAULT,
spark_jar=config.spark_index_jar,
spark_class='bio.ferlab.fhir.etl.IndexTask',
spark_config='etl-task-small',
arguments=[release_id(), study_ids(), 'biospecimen_centric', env, project(), config.es_url, es_port()],
)
study_centric >> participant_centric >> file_centric >> biospecimen_centric
index_operator('study') >> index_operator('participant') >> index_operator('file') >> index_operator('biospecimen')

with TaskGroup(group_id='publish') as publish:
study_centric = SparkOperator(
task_id='study_centric',
name='etl-publish-study-centric',
k8s_context=K8sContext.DEFAULT,
spark_jar=config.spark_publish_jar,
spark_class='bio.ferlab.fhir.etl.PublishTask',
spark_config='etl-task-small',
arguments=[config.es_url, es_port(), env, project(), release_id(), study_ids(), 'study_centric'],
)

participant_centric = SparkOperator(
task_id='participant_centric',
name='etl-publish-participant-centric',
k8s_context=K8sContext.DEFAULT,
spark_jar=config.spark_publish_jar,
spark_class='bio.ferlab.fhir.etl.PublishTask',
spark_config='etl-task-small',
arguments=[config.es_url, es_port(), env, project(), release_id(), study_ids(), 'participant_centric'],
)

file_centric = SparkOperator(
task_id='file_centric',
name='etl-publish-file-centric',
k8s_context=K8sContext.DEFAULT,
spark_jar=config.spark_publish_jar,
spark_class='bio.ferlab.fhir.etl.PublishTask',
spark_config='etl-task-small',
arguments=[config.es_url, es_port(), env, project(), release_id(), study_ids(), 'file_centric'],
)

biospecimen_centric = SparkOperator(
task_id='biospecimen_centric',
name='etl-publish-biospecimen-centric',
k8s_context=K8sContext.DEFAULT,
spark_jar=config.spark_publish_jar,
spark_class='bio.ferlab.fhir.etl.PublishTask',
spark_config='etl-task-small',
arguments=[config.es_url, es_port(), env, project(), release_id(), study_ids(), 'biospecimen_centric'],
)

study_centric >> participant_centric >> file_centric >> biospecimen_centric

arranger_update_project = ArrangerOperator(
task_id='arranger_update_project',
name='etl-publish-arranger-update-project',
k8s_context=K8sContext.DEFAULT,
cmds=['node',
'--experimental-modules=node',
'--es-module-specifier-resolution=node',
'admin/run.mjs'
],
)

fhavro_export >> import_task >> prepare_index >> es_templates_update >> index >> publish >> arranger_update_project
def publish_operator(name:str):
return etl_publish_config \
.args(es_url, es_port, env, project, release_id, study_ids, f'{name}_centric') \
.operator(
task_id=f'{name}_centric',
name=f'etl-publish-{name}-centric'
)

publish_operator('study') >> publish_operator('participant') >> publish_operator('file') >> publish_operator('biospecimen')

fhavro_export() >> etl_import() >> prepare_index() >> es_templates_update() >> index >> publish >> arranger_task()
Loading

0 comments on commit 051f927

Please sign in to comment.