diff --git a/dags/arranger.py b/dags/arranger.py index d6e0f49..88bd836 100644 --- a/dags/arranger.py +++ b/dags/arranger.py @@ -1,23 +1,36 @@ from airflow import DAG +from airflow.models import Variable from datetime import datetime -from lib.config import env, Env, K8sContext -from lib.operators.arranger import ArrangerOperator +from lib.config import keycloak_client_secret_name, env, Env, kube_config, es_url, es_port, es_credentials_secret_name,es_credentials_secret_key_password, es_credentials_secret_key_username +from lib.operators.arranger import ArrangerConfig +arranger_config = ArrangerConfig( + es_url = es_url, + node_environment= 'production' if env == Env.PROD else env, + kube_config = kube_config, + image = Variable.get('arranger_image'), + es_port = es_port, + es_cert_secret_name = 'opensearch-ca-certificate', + es_credentials_secret_name = es_credentials_secret_name, + es_credentials_secret_key_username = es_credentials_secret_key_username, + es_credentials_secret_key_password = es_credentials_secret_key_password, + keycloak_client_secret_name = keycloak_client_secret_name, +) + +def arranger_task(): + return arranger_config.args( + '--experimental-modules=node', + '--es-module-specifier-resolution=node', + 'admin/run.mjs') \ + .operator( + task_id='arranger_update_project', + name='etl-publish-arranger-update-project', + ) with DAG( - dag_id='update_arranger_project', + dag_id='update-arranger-project', start_date=datetime(2022, 1, 1), schedule_interval=None, params={}, ) as dag: + arranger_task() - - arranger_update_project = ArrangerOperator( - task_id='arranger_update_project', - name='etl-publish-arranger-update-project', - k8s_context=K8sContext.DEFAULT, - cmds=['node', - '--experimental-modules=node', - '--es-module-specifier-resolution=node', - 'admin/run.mjs', - ], - ) diff --git a/dags/create_tables.py b/dags/create_tables.py index ad0d47c..7c1462b 100644 --- a/dags/create_tables.py +++ b/dags/create_tables.py @@ -6,7 +6,7 @@ from airflow.exceptions import ParamValidationError from airflow.models import Param, TaskInstance, DagRun -from lib.config import default_config_file, default_params, K8sContext, variant_task_jar +from lib.config import spark_small_conf, default_config_file, default_params, variant_jar, etl_base_config from lib.operators.spark import SparkOperator # Update default params @@ -21,11 +21,12 @@ }) with DAG( - dag_id='create_tables', + dag_id='create-tables', start_date=datetime(2022, 1, 1), schedule_interval=None, params=params, ) as dag: + @task(task_id='get_dataset_ids') def get_dataset_ids(**kwargs) -> List[str]: ti: TaskInstance = kwargs['ti'] @@ -41,7 +42,7 @@ def get_dataset_ids(**kwargs) -> List[str]: class CreateTableAndView(SparkOperator): - template_fields = SparkOperator.template_fields + ('arguments', 'dataset_ids',) + template_fields = [*SparkOperator.template_fields,'arguments', 'dataset_ids'] def __init__(self, dataset_ids, @@ -55,15 +56,16 @@ def execute(self, **kwargs): self.arguments = self.arguments + self.dataset_ids super().execute(**kwargs) - - CreateTableAndView(task_id='create_table_and_view', - name='create-table-and-view', - k8s_context=K8sContext.DEFAULT, - spark_jar=variant_task_jar, - spark_class='bio.ferlab.datalake.spark3.hive.CreateTableAndView', - spark_config='etl-task-small', - arguments=['--config', default_config_file, - '--steps', 'default', - '--app-name', 'create_table_and_view', - ], - dataset_ids=get_dataset_ids()) + etl_base_config.add_spark_conf(spark_small_conf) \ + .with_spark_class('bio.ferlab.datalake.spark3.hive.CreateTableAndView') \ + .with_spark_jar(variant_jar) \ + .args('--config', default_config_file, + '--steps', 'default', + '--app-name', 'create_table_and_view' + ).operator( + class_to_instantiate=CreateTableAndView, + task_id = 'create_table_and_view', + name = 'create-table-and-view', + dataset_ids=get_dataset_ids() + ) + diff --git a/dags/es_templates_update.py b/dags/es_templates_update.py index 1f7a955..553f170 100644 --- a/dags/es_templates_update.py +++ b/dags/es_templates_update.py @@ -1,8 +1,7 @@ from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from kubernetes.client import models as k8s -from lib import config -from lib.config import env +from lib.config import datalake_bucket, kube_config, aws_endpoint, aws_secret_name, aws_secret_access_key, aws_secret_secret_key script = f""" #!/bin/bash @@ -33,46 +32,47 @@ curl https://raw.githubusercontent.com/Ferlab-Ste-Justine/etl-cqdg-portal/master/index-task/src/main/resources/templates/template_gene_suggestions.json --output ./templates/template_gene_suggestions.json echo Copy templates ... - mc cp ./templates/template_study_centric.json myminio/cqdg-{env}-app-datalake/templates/template_study_centric.json - mc cp ./templates/template_file_centric.json myminio/cqdg-{env}-app-datalake/templates/template_file_centric.json - mc cp ./templates/template_participant_centric.json myminio/cqdg-{env}-app-datalake/templates/template_participant_centric.json - mc cp ./templates/template_biospecimen_centric.json myminio/cqdg-{env}-app-datalake/templates/template_biospecimen_centric.json + mc cp ./templates/template_study_centric.json myminio/{datalake_bucket}/templates/template_study_centric.json + mc cp ./templates/template_file_centric.json myminio/{datalake_bucket}/templates/template_file_centric.json + mc cp ./templates/template_participant_centric.json myminio/{datalake_bucket}/templates/template_participant_centric.json + mc cp ./templates/template_biospecimen_centric.json myminio/{datalake_bucket}/templates/template_biospecimen_centric.json - mc cp ./templates/template_variant_centric.json myminio/cqdg-{env}-app-datalake/templates/template_variant_centric.json - mc cp ./templates/template_gene_centric.json myminio/cqdg-{env}-app-datalake/templates/template_gene_centric.json - mc cp ./templates/template_variant_suggestions.json myminio/cqdg-{env}-app-datalake/templates/template_variant_suggestions.json - mc cp ./templates/template_gene_suggestions.json myminio/cqdg-{env}-app-datalake/templates/template_gene_suggestions.json + mc cp ./templates/template_variant_centric.json myminio/{datalake_bucket}/templates/template_variant_centric.json + mc cp ./templates/template_gene_centric.json myminio/{datalake_bucket}/templates/template_gene_centric.json + mc cp ./templates/template_variant_suggestions.json myminio/{datalake_bucket}/templates/template_variant_suggestions.json + mc cp ./templates/template_gene_suggestions.json myminio/{datalake_bucket}/templates/template_gene_suggestions.json """ -es_templates_update = KubernetesPodOperator( - task_id='es_templates_update', - name='es-templates-update', - image="alpine:3.14", - is_delete_operator_pod=True, - cmds=["sh", "-cx"], - arguments=[script], - namespace=config.k8s_namespace, - env_vars=[ - k8s.V1EnvVar( - name='AWS_ENDPOINT', - value='https://objets.juno.calculquebec.ca', - ), - k8s.V1EnvVar( - name='AWS_ACCESS_KEY_ID', - value_from=k8s.V1EnvVarSource( - secret_key_ref=k8s.V1SecretKeySelector( - name='ceph-s3-credentials', - key='access', - ), +def es_templates_update(): + return KubernetesPodOperator( + task_id='es_templates_update', + name='es-templates-update', + image="alpine:3.14", + is_delete_operator_pod=True, + cmds=["sh", "-cx"], + arguments=[script], + namespace=kube_config.namespace, + env_vars=[ + k8s.V1EnvVar( + name='AWS_ENDPOINT', + value=aws_endpoint, ), - ), - k8s.V1EnvVar( - name='AWS_SECRET_ACCESS_KEY', - value_from=k8s.V1EnvVarSource( - secret_key_ref=k8s.V1SecretKeySelector( - name='ceph-s3-credentials', - key='secret', + k8s.V1EnvVar( + name='AWS_ACCESS_KEY_ID', + value_from=k8s.V1EnvVarSource( + secret_key_ref=k8s.V1SecretKeySelector( + name=aws_secret_name, + key=aws_secret_access_key, + ), ), ), - ), ] -) + k8s.V1EnvVar( + name='AWS_SECRET_ACCESS_KEY', + value_from=k8s.V1EnvVarSource( + secret_key_ref=k8s.V1SecretKeySelector( + name=aws_secret_name, + key=aws_secret_secret_key, + ), + ), + ), ] + ) diff --git a/dags/etl.py b/dags/etl.py index aa688ee..e4d36f4 100644 --- a/dags/etl.py +++ b/dags/etl.py @@ -2,12 +2,13 @@ from airflow.models.param import Param from airflow.utils.task_group import TaskGroup from datetime import datetime -from lib import config -from lib.operators.fhavro import FhavroOperator -from lib.config import env, K8sContext -from lib.operators.spark import SparkOperator -from lib.operators.arranger import ArrangerOperator +from lib.config import env, study_ids, release_id, es_port, es_url, etl_publish_config, project from es_templates_update import es_templates_update +from etl_import import etl_import +from arranger import arranger_task +from etl_fhavro_export import fhavro_export +from etl_index import index_operator +from etl_prepare_index import prepare_index with DAG( dag_id='etl', @@ -20,148 +21,19 @@ 'es_port': Param('9200', type='string') }, ) as dag: - def release_id() -> str: - return '{{ params.release_id }}' - - - def study_ids() -> str: - return '{{ params.study_ids }}' - - - def project() -> str: - return '{{ params.project }}' - - - def es_port() -> str: - return '{{ params.es_port }}' - - - fhavro_export = FhavroOperator( - task_id='fhavro_export', - name='etl-fhavro_export', - k8s_context=K8sContext.DEFAULT, - cmds=['java', - '-cp', - 'fhavro-export.jar', - 'bio/ferlab/fhir/etl/FhavroExport', - release_id(), study_ids(), env - ], - ) - - import_task = SparkOperator( - task_id='import_task', - name='etl-import-task', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_import_jar, - spark_class='bio.ferlab.fhir.etl.ImportTask', - spark_config='etl-task-small', - arguments=[f'config/{env}-{project()}.conf', 'default', release_id(), study_ids()], - ) - - prepare_index = SparkOperator( - task_id='prepare_index', - name='etl-prepare-index', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_prepare_index_jar, - spark_class='bio.ferlab.fhir.etl.PrepareIndex', - spark_config='etl-task-small', - arguments=[f'config/{env}-{project()}.conf', 'default', 'all', study_ids()], - ) with TaskGroup(group_id='index') as index: - study_centric = SparkOperator( - task_id='study_centric', - name='etl-index-study-centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_index_jar, - spark_class='bio.ferlab.fhir.etl.IndexTask', - spark_config='etl-task-small', - arguments=[release_id(), study_ids(), 'study_centric', env, project(), config.es_url, es_port()], - ) - - participant_centric = SparkOperator( - task_id='participant_centric', - name='etl-index-participant-centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_index_jar, - spark_class='bio.ferlab.fhir.etl.IndexTask', - spark_config='etl-task-small', - arguments=[release_id(), study_ids(), 'participant_centric', env, project(), config.es_url, es_port()], - ) - - file_centric = SparkOperator( - task_id='file_centric', - name='etl-index-file-centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_index_jar, - spark_class='bio.ferlab.fhir.etl.IndexTask', - spark_config='etl-task-small', - arguments=[release_id(), study_ids(), 'file_centric', env, project(), config.es_url, es_port()], - ) - - biospecimen_centric = SparkOperator( - task_id='biospecimen_centric', - name='etl-index-biospecimen-centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_index_jar, - spark_class='bio.ferlab.fhir.etl.IndexTask', - spark_config='etl-task-small', - arguments=[release_id(), study_ids(), 'biospecimen_centric', env, project(), config.es_url, es_port()], - ) - study_centric >> participant_centric >> file_centric >> biospecimen_centric + index_operator('study') >> index_operator('participant') >> index_operator('file') >> index_operator('biospecimen') with TaskGroup(group_id='publish') as publish: - study_centric = SparkOperator( - task_id='study_centric', - name='etl-publish-study-centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_publish_jar, - spark_class='bio.ferlab.fhir.etl.PublishTask', - spark_config='etl-task-small', - arguments=[config.es_url, es_port(), env, project(), release_id(), study_ids(), 'study_centric'], - ) - - participant_centric = SparkOperator( - task_id='participant_centric', - name='etl-publish-participant-centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_publish_jar, - spark_class='bio.ferlab.fhir.etl.PublishTask', - spark_config='etl-task-small', - arguments=[config.es_url, es_port(), env, project(), release_id(), study_ids(), 'participant_centric'], - ) - - file_centric = SparkOperator( - task_id='file_centric', - name='etl-publish-file-centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_publish_jar, - spark_class='bio.ferlab.fhir.etl.PublishTask', - spark_config='etl-task-small', - arguments=[config.es_url, es_port(), env, project(), release_id(), study_ids(), 'file_centric'], - ) - - biospecimen_centric = SparkOperator( - task_id='biospecimen_centric', - name='etl-publish-biospecimen-centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_publish_jar, - spark_class='bio.ferlab.fhir.etl.PublishTask', - spark_config='etl-task-small', - arguments=[config.es_url, es_port(), env, project(), release_id(), study_ids(), 'biospecimen_centric'], - ) - - study_centric >> participant_centric >> file_centric >> biospecimen_centric - - arranger_update_project = ArrangerOperator( - task_id='arranger_update_project', - name='etl-publish-arranger-update-project', - k8s_context=K8sContext.DEFAULT, - cmds=['node', - '--experimental-modules=node', - '--es-module-specifier-resolution=node', - 'admin/run.mjs' - ], - ) - - fhavro_export >> import_task >> prepare_index >> es_templates_update >> index >> publish >> arranger_update_project + def publish_operator(name:str): + return etl_publish_config \ + .args(es_url, es_port, env, project, release_id, study_ids, f'{name}_centric') \ + .operator( + task_id=f'{name}_centric', + name=f'etl-publish-{name}-centric' + ) + + publish_operator('study') >> publish_operator('participant') >> publish_operator('file') >> publish_operator('biospecimen') + + fhavro_export() >> etl_import() >> prepare_index() >> es_templates_update() >> index >> publish >> arranger_task() diff --git a/dags/etl_enrich_snv.py b/dags/etl_enrich_snv.py new file mode 100644 index 0000000..c389586 --- /dev/null +++ b/dags/etl_enrich_snv.py @@ -0,0 +1,36 @@ +from airflow import DAG +from airflow.models.param import Param +from datetime import datetime +from lib.config import default_config_file, etl_base_config, spark_medium_conf, variant_jar, study_id, dataset, batch +from lib.operators.spark import SparkOperator + +with DAG( + dag_id='etl-enrich-snv', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'study_id': Param('ST0000002', type='string'), + 'dataset': Param('dataset_default', type='string'), + 'batch': Param('annotated_vcf', type='string'), + 'project': Param('cqdg', type='string'), + }, +) as dag: + + variant_task_enrich_snv = etl_base_config \ + .with_spark_jar(variant_jar) \ + .with_spark_class('bio.ferlab.etl.enriched.RunEnrichGenomic') \ + .add_spark_conf(spark_medium_conf) \ + .args('snv') \ + .args( + '--config', default_config_file, + '--steps', 'default', + '--app-name', 'variant_task_enrich_snv', + '--study-id', study_id, + '--dataset', dataset, + '--batch', batch + ).operator( + task_id='variant_task_variant_enrich_snv', + name='etl-variant_task_variant_enrich_snv' + ) + + variant_task_enrich_snv diff --git a/dags/etl_enrich_specimens.py b/dags/etl_enrich_specimens.py index c36bca8..897d322 100644 --- a/dags/etl_enrich_specimens.py +++ b/dags/etl_enrich_specimens.py @@ -1,12 +1,11 @@ from airflow import DAG from airflow.models.param import Param from datetime import datetime -from lib import config -from lib.config import env, Env, K8sContext -from lib.operators.spark import SparkOperator +from lib.config import default_config_file, study_ids +from etl_prepare_index import etl_base_config, spark_small_conf, prepare_index_jar with DAG( - dag_id='etl_enrich_specimen', + dag_id='etl-enrich-specimen', start_date=datetime(2022, 1, 1), schedule_interval=None, params={ @@ -15,23 +14,18 @@ }, ) as dag: - def study_ids() -> str: - return '{{ params.study_ids }}' - - def project() -> str: - return '{{ params.project }}' - - - enrich_specimen = SparkOperator( - task_id='enrich-specimen', - name='etl-enrich-specimen', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_prepare_index_jar, - spark_class='bio.ferlab.fhir.etl.Enrich', - spark_config='etl-task-small', - arguments=['--config', f'config/{env}-{project()}.conf', - '--steps', 'default', - '--app-name', 'enrich_specimen', - '--study-id', study_ids()], - ) + etl_base_config \ + .add_spark_conf(spark_small_conf) \ + .with_spark_jar(prepare_index_jar) \ + .with_spark_class('bio.ferlab.fhir.etl.Enrich') \ + .args( + '--config', default_config_file, + '--steps', 'default', + '--app-name', 'enrich_specimen', + '--study-id', study_ids + ).operator( + task_id='enrich-specimen', + name='etl-enrich-specimen' + ) + diff --git a/dags/etl_enrich_variants.py b/dags/etl_enrich_variants.py new file mode 100644 index 0000000..dc44f2c --- /dev/null +++ b/dags/etl_enrich_variants.py @@ -0,0 +1,32 @@ +from airflow import DAG +from airflow.models.param import Param +from datetime import datetime +from lib.config import etl_variant_config, default_config_file +from lib.operators.spark import SparkOperator + +etl_variant_enrich_config = etl_variant_config \ + .with_spark_class('bio.ferlab.etl.enriched.RunEnrichGenomic') \ + .args('--config', default_config_file, + '--steps', 'default' + ) +with DAG( + dag_id='etl-enrich-variants', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'project': Param('cqdg', type='string'), + }, +) as dag: + + variant_task_enrich_variants = etl_variant_enrich_config.prepend_args('variants').operator( + task_id='variant_task_variant_enrich_variants', + name='etl-variant_task_variant_enrich_variants' + ) + + variant_task_enrich_consequences = etl_variant_enrich_config.prepend_args('consequences').operator( + task_id='variant_task_variant_enrich_consequences', + name='etl-variant_task_variant_enrich_consequences' + ) + + + variant_task_enrich_variants >> variant_task_enrich_consequences diff --git a/dags/etl_fhavro_export.py b/dags/etl_fhavro_export.py new file mode 100644 index 0000000..4f8c96f --- /dev/null +++ b/dags/etl_fhavro_export.py @@ -0,0 +1,36 @@ +from airflow import DAG +from airflow.models import Param, Variable +from datetime import datetime +from lib.config import release_id, study_ids, env, fhir_url, datalake_bucket, keycloak_client_secret_name, keycloak_url, aws_secret_name, aws_secret_access_key, aws_secret_secret_key, kube_config, aws_endpoint +from lib.operators.fhavro import FhavroConfig + +fhavro_config = FhavroConfig( + fhir_url=fhir_url, + bucket_name=datalake_bucket, + keycloak_client_secret_name = keycloak_client_secret_name, + keycloak_url=keycloak_url, + aws_endpoint= aws_endpoint, + aws_credentials_secret_name= aws_secret_name, + aws_credentials_secret_access_key=aws_secret_access_key, + aws_credentials_secret_secret_key=aws_secret_secret_key, + kube_config=kube_config, + image=Variable.get('fhavro_export_image') +) + +def fhavro_export(): + return fhavro_config.args(release_id, study_ids, env).operator( + task_id='fhavro_export', + name='etl-fhavro_export' + ) + +with DAG( + dag_id='fhavro-export', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'release_id': Param('7', type='string'), + 'study_ids': Param('ST0000017', type='string'), + 'project': Param('cqdg', type='string'), + }, +) as dag: + fhavro_export() diff --git a/dags/etl_fhir_import.py b/dags/etl_fhir_import.py new file mode 100644 index 0000000..aaf68dd --- /dev/null +++ b/dags/etl_fhir_import.py @@ -0,0 +1,60 @@ +from datetime import datetime + +from airflow import DAG +from airflow.models import Param, Variable + +from lib.config import fhir_url, keycloak_client_secret_name, keycloak_url, aws_secret_name, aws_secret_access_key, aws_secret_secret_key, clinical_data_bucket, file_import_bucket, kube_config +from lib.operators.fhir_import import FhirCsvOperator, FhirCsvConfig + +fhir_import_config = FhirCsvConfig( + fhir_url=fhir_url, + keycloak_client_secret_name = keycloak_client_secret_name, + keycloak_url=keycloak_url, + aws_credentials_secret_name= aws_secret_name, + aws_credentials_secret_access_key=aws_secret_access_key, + aws_credentials_secret_secret_key=aws_secret_secret_key, + clinical_data_bucket_name=clinical_data_bucket, + file_import_bucket_name=file_import_bucket, + id_service_url=Variable.get('id_service_url'), + kube_config=kube_config, + image=Variable.get('fhir_import_image') +).args("bio/ferlab/cqdg/etl/FhirImport") + +with DAG( + dag_id='etl-fhir-import', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'prefix': Param('prefix', type='string'), + 'studyId': Param('7', type='string'), + 'studyVersion': Param('13', type='string'), + 'study': Param('cag', type='string'), + 'project': Param('jmichaud', type='string'), + 'is_restricted': Param('', enum=['', 'true', 'false']), + }, +) as dag: + + def prefix() -> str: + return '{{ params.prefix }}' + + def study_clin_data_id() -> str: + return '{{ params.studyId }}' + + def study_clin_data_version() -> str: + return '{{ params.studyVersion }}' + + def study() -> str: + return '{{ params.study }}' + + def project() -> str: + return '{{ params.project }}' + + def is_restricted() -> str: + return '{{ params.is_restricted }}' + + csv_import = fhir_import_config \ + .args(prefix(), study_clin_data_id(), study_clin_data_version(), study(), project(), "true", is_restricted()) \ + .operator( + task_id='fhir_import', + name='etl-fhir_import', + ) diff --git a/dags/etl_import.py b/dags/etl_import.py new file mode 100644 index 0000000..2503562 --- /dev/null +++ b/dags/etl_import.py @@ -0,0 +1,28 @@ +from airflow import DAG +from airflow.models.param import Param +from datetime import datetime +from lib.config import etl_base_config, spark_small_conf, import_jar, default_config_file, release_id, study_ids +from lib.operators.spark import SparkOperator + +def etl_import(): + return etl_base_config \ + .with_spark_class('bio.ferlab.fhir.etl.ImportTask') \ + .with_spark_jar(import_jar) \ + .add_spark_conf(spark_small_conf) \ + .args(default_config_file, 'default', release_id, study_ids) \ + .operator( + task_id='import_task', + name='etl-import-task' + ) + +with DAG( + dag_id='etl-import', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'release_id': Param('7', type='string'), + 'study_ids': Param('ST0000017', type='string'), + 'project': Param('cqdg', type='string'), + }, +) as dag: + etl_import() diff --git a/dags/etl_import_fhir.py b/dags/etl_import_fhir.py deleted file mode 100644 index 13a542c..0000000 --- a/dags/etl_import_fhir.py +++ /dev/null @@ -1,47 +0,0 @@ -from datetime import datetime - -from airflow import DAG -from airflow.models.param import Param - -from lib.config import K8sContext -from lib.operators.fhir_import import FhirCsvOperator - -with DAG( - dag_id='etl_import_fhir', - start_date=datetime(2022, 1, 1), - schedule_interval=None, - params={ - 'prefix': Param('prefix', type='string'), - 'studyId': Param('7', type='string'), - 'studyVersion': Param('13', type='string'), - 'study': Param('cag', type='string'), - 'project': Param('jmichaud', type='string'), - 'is_restricted': Param('', enum=['', 'true', 'false']), - }, -) as dag: - - def prefix() -> str: - return '{{ params.prefix }}' - - def study_clin_data_id() -> str: - return '{{ params.studyId }}' - - def study_clin_data_version() -> str: - return '{{ params.studyVersion }}' - - def study() -> str: - return '{{ params.study }}' - - def project() -> str: - return '{{ params.project }}' - - def is_restricted() -> str: - return '{{ params.is_restricted }}' - - csv_import = FhirCsvOperator( - task_id='fhir_import', - name='etl-fhir_import', - k8s_context=K8sContext.DEFAULT, - arguments=["-cp", "cqdg-fhir-import.jar", "bio/ferlab/cqdg/etl/FhirImport", - prefix(), study_clin_data_id(), study_clin_data_version(), study(), project(), "true", is_restricted()], - ) diff --git a/dags/etl_index.py b/dags/etl_index.py new file mode 100644 index 0000000..ac33f9f --- /dev/null +++ b/dags/etl_index.py @@ -0,0 +1,23 @@ +from airflow import DAG +from airflow.models.param import Param +from datetime import datetime +from lib.config import etl_index_config, env, es_url, es_port, study_ids, release_id, project, etl_index_config +def index_operator(name:str): + return etl_index_config.with_spark_class('bio.ferlab.fhir.etl.IndexTask') \ + .args(release_id, study_ids, f'{name}_centric', env, project, es_url, es_port) \ + .operator( + task_id=f'{name}_centric', + name=f'etl-index-{name}-centric' + ) +with DAG( + dag_id='etl-index', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'release_id': Param('7', type='string'), + 'study_ids': Param('ST0000017', type='string'), + 'project': Param('cqdg', type='string') + }, +) as dag: + index_operator('study') >> index_operator('participant') >> index_operator('file') >> index_operator('biospecimen') + diff --git a/dags/etl_index_variants.py b/dags/etl_index_variants.py new file mode 100644 index 0000000..9b74b46 --- /dev/null +++ b/dags/etl_index_variants.py @@ -0,0 +1,47 @@ +from airflow import DAG +from airflow.models.param import Param +from airflow.utils.task_group import TaskGroup +from datetime import datetime +from lib.config import es_port, es_url, release_id, default_config_file, etl_index_config, datalake_bucket + +etl_index_variant_config = etl_index_config \ + .with_spark_class('bio.ferlab.fhir.etl.VariantIndexTask') + +with DAG( + dag_id='etl-index-variants', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'release_id': Param('7', type='string'), + 'project': Param('cqdg', type='string') + }, +) as dag: + + chromosomes = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', + '19', '20', '21', '22', 'X', 'Y', 'M'] + + def operator(indexName:str, chromosome:str='all'): + indexNameU = indexName.replace('_', '-') + return etl_index_variant_config \ + .args(es_port, release_id, indexName, default_config_file, f's3a://{datalake_bucket}/es_index/{indexName}/', chromosome, es_url) \ + .operator( + task_id=f'{indexName}_index', + name=f'etl-index-{indexNameU}-index' + ) + + def genes(): + return operator('gene_centric') >> operator('gene_suggestions') + + + def variants(chr): + with TaskGroup(group_id=f'variant_index-{chr}') as variant_index: + operator('variant_centric', chr) >> operator('variant_suggestions', chr) + return variant_index + + + task_arr = [genes()] + + for chr in chromosomes: + task = variants(chr) + task_arr[-1] >> task + task_arr.append(task) diff --git a/dags/etl_normalize_variants.py b/dags/etl_normalize_variants.py new file mode 100644 index 0000000..42d97ef --- /dev/null +++ b/dags/etl_normalize_variants.py @@ -0,0 +1,107 @@ +from airflow import DAG +from airflow.models.param import Param +from datetime import datetime +from lib.config import release_id, batch, kube_config, default_config_file, study_id, datalake_bucket, aws_secret_name, aws_secret_access_key, aws_secret_secret_key, aws_endpoint, hive_metastore_uri, spark_large_conf +from kubernetes.client import models as k8s +from lib.operators.spark import SparkOperatorConfig +spark_default_conf = { + 'spark.sql.shuffle.partitions' : '1000', + 'spark.sql.extensions' : 'io.delta.sql.DeltaSparkSessionExtension', + 'spark.sql.catalog.spark_catalog' : 'org.apache.spark.sql.delta.catalog.DeltaCatalog', + 'spark.hadoop.fs.s3a.impl' : 'org.apache.hadoop.fs.s3a.S3AFileSystem', + 'spark.hadoop.fs.s3a.fast.upload' : 'true', + 'spark.hadoop.fs.s3a.connection.ssl.enabled' : 'true', + 'spark.hadoop.fs.s3a.path.style.access' : 'true', + 'spark.hadoop.fs.s3a.endpoint' : aws_endpoint, + 'spark.hadoop.fs.s3a.aws.credentials.provider' : 'com.amazonaws.auth.EnvironmentVariableCredentialsProvider', + 'spark.kubernetes.driver.secretKeyRef.AWS_ACCESS_KEY_ID' : f'{aws_secret_name}:{aws_secret_access_key}', + 'spark.kubernetes.driver.secretKeyRef.AWS_SECRET_ACCESS_KEY' : f'{aws_secret_name}:{aws_secret_secret_key}', + 'spark.kubernetes.executor.secretKeyRef.AWS_ACCESS_KEY_ID' : f'{aws_secret_name}:{aws_secret_access_key}', + 'spark.kubernetes.executor.secretKeyRef.AWS_SECRET_ACCESS_KEY' : f'{aws_secret_name}:{aws_secret_secret_key}', + 'spark.hadoop.hive.metastore.uris' : hive_metastore_uri, + 'spark.sql.warehouse.dir' : f's3a://{datalake_bucket}/hive', + 'spark.eventLog.enabled' : 'true', + 'spark.eventLog.dir' : f's3a://{datalake_bucket}/spark-logs', + 'spark.driver.extraJavaOptions' : '"-Divy.cache.dir=/tmp -Divy.home=/tmp"', + 'spark.jars.ivy': '/tmp' +} + +normalized_etl= SparkOperatorConfig( + spark_configs=[spark_default_conf, spark_large_conf], + image = 'apache/spark:3.4.1', + kube_config=kube_config, + is_delete_operator_pod=False + ) \ + .with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic').args( + '--config', default_config_file, + '--steps', 'default', + '--app-name', 'variant_task_consequences', + '--owner', '{{ params.owner }}', + '--dataset', '{{ params.dataset }}', + '--batch', batch, + '--study-id', study_id, + '--study-code', '{{ params.study_code }}' +) \ +.with_spark_jar('s3a://cqdg-qa-app-datalake/jars/variant-task.jar') \ +.add_package('io.delta:delta-core_2.12:2.3.0') \ +.add_spark_conf({ + # 'spark.jars.packages': 'io.delta:delta-core_2.12:2.3.0', + # 'spark.jars.packages': 'io.delta:delta-core_2.12:2.1.1,io.projectglow:glow-spark3_2.12:1.2.1', + # 'spark.jars.excludes':'org.apache.hadoop:hadoop-client', + 'spark.kubernetes.container.image': 'ferlabcrsj/spark:469f2fc61f06fcfa73c1480a5e5f73f59768152d', + # 'spark.kubernetes.container.image': 'apache/spark:3.4.1', + # 'spark.kubernetes.container.image': 'apache/spark:v3.3.2', + # 'spark.kubernetes.file.upload.path': f's3a://{datalake_bucket}/dependencies' +}) #.with_spark_jar('https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/v2.21.5/variant-task.jar') + +### +# - ferlabcrsj/spark:469f2fc61f06fcfa73c1480a5e5f73f59768152d (spark 3.3.2 + hadoop-aws) + https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/v2.21.5/variant-task.jar => fonctionne +# - apache/spark:3.3.2 + https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/v2.21.5/variant-task.jar + hadoop-aws dans packages => ne marche pas car probleme classpath ( java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found) +# - apache/spark:3.4.1 + https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/v2.21.5/variant-task.jar => ne fonctionne pas car incompatibilité version delta +# - ferlabcrsj/spark:469f2fc61f06fcfa73c1480a5e5f73f59768152d (spark 3.3.2 + hadoop-aws) + new variant-task.jar delta 2.1.1 => ne fonctionne pas : java.lang.NoClassDefFoundError: io/delta/tables/DeltaTable$ + + +# s3a://cqdg-qa-app-datalake/jars/variant-task.jar +### + +def normalize_variant_operator(name): + etl = normalized_etl.args('--release-id', release_id ) if name == 'snv' else normalized_etl + return etl.prepend_args(name).operator( + task_id=f'normalize-{name}', + name=f'normalize-{name}', + env_vars=[ + k8s.V1EnvVar( + name='AWS_ACCESS_KEY_ID', + value_from=k8s.V1EnvVarSource( + secret_key_ref=k8s.V1SecretKeySelector( + name=aws_secret_name, + key=aws_secret_access_key, + ), + ), + ), + k8s.V1EnvVar( + name='AWS_SECRET_ACCESS_KEY', + value_from=k8s.V1EnvVarSource( + secret_key_ref=k8s.V1SecretKeySelector( + name=aws_secret_name, + key=aws_secret_secret_key, + ), + ), + ), ] + ) + +with DAG( + dag_id='etl-normalize-variants', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'study_id': Param('ST0000002', type='string'), + 'study_code': Param('study1', type='string'), + 'owner': Param('jmichaud', type='string'), + 'release_id': Param('1', type='string'), + 'dataset': Param('dataset_default', type='string'), + 'batch': Param('annotated_vcf', type='string'), + 'project': Param('cqdg', type='string'), + }, +) as dag: + normalize_variant_operator('snv') >> normalize_variant_operator('consequences') diff --git a/dags/etl_prepare_index.py b/dags/etl_prepare_index.py new file mode 100644 index 0000000..457c548 --- /dev/null +++ b/dags/etl_prepare_index.py @@ -0,0 +1,29 @@ +from airflow import DAG +from airflow.models.param import Param +from datetime import datetime +from lib.config import default_config_file, etl_base_config, spark_small_conf, prepare_index_jar, study_ids + +etl_prepare_config = etl_base_config \ + .add_spark_conf(spark_small_conf) \ + .with_spark_jar(prepare_index_jar) \ + .with_spark_class('bio.ferlab.fhir.etl.PrepareIndex') \ + .args(default_config_file, 'default', 'all', study_ids) + +def prepare_index(): + return etl_prepare_config \ + .operator( + task_id='prepare_index', + name='etl-prepare-index' + ) +with DAG( + dag_id='etl-prepare-index', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'release_id': Param('7', type='string'), + 'study_ids': Param('ST0000017', type='string'), + 'project': Param('cqdg', type='string'), + }, +) as dag: + prepare_index() + diff --git a/dags/etl_prepare_index_variants.py b/dags/etl_prepare_index_variants.py new file mode 100644 index 0000000..aaa3701 --- /dev/null +++ b/dags/etl_prepare_index_variants.py @@ -0,0 +1,41 @@ +from airflow import DAG +from airflow.models.param import Param +from datetime import datetime +from lib.config import etl_variant_config, default_config_file +from lib.operators.spark import SparkOperator + +etl_variant_prepared_config = etl_variant_config \ + .with_spark_class('bio.ferlab.etl.prepared.RunPrepared') \ + .args('--config', default_config_file, + '--steps', 'default' + ) +with DAG( + dag_id='etl-prepare-index-variant', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'project': Param('cqdg', type='string'), + }, +) as dag: + + variant_task_variant_centric = etl_variant_prepared_config.prepend_args('variant_centric').operator( + task_id='variant_task_variant_centric', + name='etl-variant_task_variant_centric' + ) + + variant_task_gene_centric = etl_variant_prepared_config.prepend_args('gene_centric').operator( + task_id='variant_task_gene_centric', + name='etl-variant_task_gene_centric' + ) + + variant_task_variant_suggestions = etl_variant_prepared_config.prepend_args('variant_suggestions').operator( + task_id='variant_task_variant_suggestions', + name='etl-variant_variant_suggestions' + ) + + variant_task_gene_suggestions = etl_variant_prepared_config.prepend_args('gene_suggestions').operator( + task_id='variant_task_gene_suggestions', + name='etl-variant_gene_suggestions' + ) + + variant_task_variant_centric >> variant_task_gene_centric >> variant_task_variant_suggestions >> variant_task_gene_suggestions diff --git a/dags/etl_snv_enrich.py b/dags/etl_snv_enrich.py deleted file mode 100644 index 03f22f7..0000000 --- a/dags/etl_snv_enrich.py +++ /dev/null @@ -1,47 +0,0 @@ -from airflow import DAG -from airflow.models.param import Param -from datetime import datetime -from lib import config -from lib.config import env, Env, K8sContext -from lib.operators.spark import SparkOperator - -with DAG( - dag_id='etl_snv_enrich', - start_date=datetime(2022, 1, 1), - schedule_interval=None, - params={ - 'study_id': Param('ST0000002', type='string'), - 'dataset': Param('dataset_default', type='string'), - 'batch': Param('annotated_vcf', type='string'), - 'project': Param('cqdg', type='string'), - }, -) as dag: - def project() -> str: - return '{{ params.project }}' - - def study_id() -> str: - return '{{ params.study_id }}' - - def dataset() -> str: - return '{{ params.dataset }}' - - def batch() -> str: - return '{{ params.batch }}' - - variant_task_enrich_snv = SparkOperator( - task_id='variant_task_variant_enrich_snv', - name='etl-variant_task_variant_enrich_snv', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.variant_task_jar, - spark_class='bio.ferlab.etl.enriched.RunEnrichGenomic', - spark_config='etl-task-xlarge', - arguments=['snv', - '--config', f'config/{env}-{project()}.conf', - '--steps', 'default', - '--app-name', 'variant_task_enrich_snv', - '--study-id', study_id(), - '--dataset', dataset(), - '--batch', batch()], - ) - - variant_task_enrich_snv diff --git a/dags/etl_variant_index.py b/dags/etl_variant_index.py deleted file mode 100644 index 2f12c3f..0000000 --- a/dags/etl_variant_index.py +++ /dev/null @@ -1,95 +0,0 @@ -from airflow import DAG -from airflow.models.param import Param -from airflow.utils.task_group import TaskGroup -from datetime import datetime -from lib import config -from lib.config import env, K8sContext -from lib.operators.spark import SparkOperator - -with DAG( - dag_id='variant-index_task', - start_date=datetime(2022, 1, 1), - schedule_interval=None, - params={ - 'release_id': Param('7', type='string'), - 'study_ids': Param('ST0000017', type='string'), - 'project': Param('cqdg', type='string'), - 'es_port': Param('9200', type='string') - }, -) as dag: - def release_id() -> str: - return '{{ params.release_id }}' - - def study_ids() -> str: - return '{{ params.study_ids }}' - - - def project() -> str: - return '{{ params.project }}' - - - def es_port() -> str: - return '{{ params.es_port }}' - - - chromosomes = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', - '19', '20', '21', '22', 'X', 'Y', 'M'] - - def genes(): - gene_centric_task = SparkOperator( - task_id=f'gene_centric_index', - name=f'etl-index-gene-centric-index', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_index_jar, - spark_class='bio.ferlab.fhir.etl.VariantIndexTask', - spark_config='etl-task-small', - arguments=[es_port(), release_id(), 'gene_centric', f'config/{env}-{project()}.conf', - f's3a://cqdg-{env}-app-datalake/es_index/gene_centric/', "all", config.es_url], - ) - - gene_suggestions_task = SparkOperator( - task_id=f'gene_suggestions_index', - name=f'etl-index-gene-suggestions-index', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_index_jar, - spark_class='bio.ferlab.fhir.etl.VariantIndexTask', - spark_config='etl-task-small', - arguments=[es_port(), release_id(), 'gene_suggestions', f'config/{env}-{project()}.conf', - f's3a://cqdg-{env}-app-datalake/es_index/gene_suggestions/', "all", config.es_url], - ) - return gene_centric_task >> gene_suggestions_task - - - def variants(c): - with TaskGroup(group_id=f'variant_index-{c}') as variant_index: - variant_centric_task = SparkOperator( - task_id=f'variant_centric_index', - name=f'etl-index-variant-centric-index', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_index_jar, - spark_class='bio.ferlab.fhir.etl.VariantIndexTask', - spark_config='etl-task-small', - arguments=[es_port(), release_id(), 'variant_centric', f'config/{env}-{project()}.conf', - f's3a://cqdg-{env}-app-datalake/es_index/variant_centric/', c, config.es_url], - ) - - variants_suggestions_task = SparkOperator( - task_id=f'variant_suggestions_index', - name=f'etl-index-variant-suggestions-index', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_index_jar, - spark_class='bio.ferlab.fhir.etl.VariantIndexTask', - spark_config='etl-task-small', - arguments=[es_port(), release_id(), 'variant_suggestions', f'config/{env}-{project()}.conf', - f's3a://cqdg-{env}-app-datalake/es_index/variant_suggestions/', c, config.es_url], - ) - variant_centric_task >> variants_suggestions_task - return variant_index - - - task_arr = [genes()] - - for chr in chromosomes: - task = variants(chr) - task_arr[-1] >> task - task_arr.append(task) diff --git a/dags/etl_variants.py b/dags/etl_variants.py deleted file mode 100644 index 38512e7..0000000 --- a/dags/etl_variants.py +++ /dev/null @@ -1,86 +0,0 @@ -from airflow import DAG -from airflow.models.param import Param -from datetime import datetime -from lib import config -from lib.config import env, Env, K8sContext -from lib.operators.spark import SparkOperator - -with DAG( - dag_id='etl_variant', - start_date=datetime(2022, 1, 1), - schedule_interval=None, - params={ - 'study_id': Param('ST0000002', type='string'), - 'study_code': Param('study1', type='string'), - 'owner': Param('jmichaud', type='string'), - 'release_id': Param('1', type='string'), - 'dataset': Param('dataset_default', type='string'), - 'batch': Param('annotated_vcf', type='string'), - 'project': Param('cqdg', type='string'), - }, -) as dag: - - def study_id() -> str: - return '{{ params.study_id }}' - - def study_code() -> str: - return '{{ params.study_code }}' - - def project() -> str: - return '{{ params.project }}' - - def release_id() -> str: - return '{{ params.release_id }}' - - def dataset() -> str: - return '{{ params.dataset }}' - - def batch() -> str: - return '{{ params.batch }}' - - def owner() -> str: - return '{{ params.owner }}' - - def spark_config(): - return 'etl-task-xlarge' if env == Env.PROD else 'etl-task-large' - - variant_task_variants = SparkOperator( - task_id='variant-task_snv', - name='etl-variant-task_snv', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.variant_task_jar, - spark_class='bio.ferlab.etl.normalized.RunNormalizedGenomic', - spark_config=spark_config(), - arguments=['snv', - '--config', f'config/{env}-{project()}.conf', - '--steps', 'default', - '--app-name', 'variant_task_consequences', - '--release-id', release_id(), - '--owner', owner(), - '--dataset', dataset(), - '--batch', batch(), - '--study-id', study_id(), - '--study-code', study_code() - ], - ) - - variant_task_consequences = SparkOperator( - task_id='variant_task_consequences', - name='etl-variant_task_consequences', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.variant_task_jar, - spark_class='bio.ferlab.etl.normalized.RunNormalizedGenomic', - spark_config=spark_config(), - arguments=['consequences', - '--config', f'config/{env}-{project()}.conf', - '--steps', 'default', - '--app-name', 'variant_task_consequences', - '--study-id', study_id(), - '--study-code', study_code(), - '--owner', owner(), - '--dataset', dataset(), - '--batch', batch()], - ) - - -variant_task_variants >> variant_task_consequences diff --git a/dags/etl_variants_enrich.py b/dags/etl_variants_enrich.py deleted file mode 100644 index 9c6b313..0000000 --- a/dags/etl_variants_enrich.py +++ /dev/null @@ -1,47 +0,0 @@ -from airflow import DAG -from airflow.models.param import Param -from datetime import datetime -from lib import config -from lib.config import env, Env, K8sContext -from lib.operators.spark import SparkOperator - -with DAG( - dag_id='etl_variant_enrich', - start_date=datetime(2022, 1, 1), - schedule_interval=None, - params={ - 'project': Param('cqdg', type='string'), - }, -) as dag: - def project() -> str: - return '{{ params.project }}' - - def spark_config(): - return 'etl-task-xlarge' if env == Env.PROD else 'etl-task-large' - - - variant_task_enrich_variants = SparkOperator( - task_id='variant_task_variant_enrich_snv', - name='etl-variant_task_variant_enrich_snv', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.variant_task_jar, - spark_class='bio.ferlab.etl.enriched.RunEnrichGenomic', - spark_config=spark_config(), - arguments=['variants', - '--config', f'config/{env}-{project()}.conf', - '--steps', 'default'], - ) - - variant_task_enrich_consequences = SparkOperator( - task_id='variant_task_variant_enrich_consequences', - name='etl-variant_task_variant_enrich_consequences', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.variant_task_jar, - spark_class='bio.ferlab.etl.enriched.RunEnrichGenomic', - spark_config=spark_config(), - arguments=['consequences', - '--config', f'config/{env}-{project()}.conf', - '--steps', 'default'], - ) - - variant_task_enrich_variants >> variant_task_enrich_consequences diff --git a/dags/etl_variants_prepared.py b/dags/etl_variants_prepared.py deleted file mode 100644 index 13e13e2..0000000 --- a/dags/etl_variants_prepared.py +++ /dev/null @@ -1,71 +0,0 @@ -from airflow import DAG -from airflow.models.param import Param -from datetime import datetime -from lib import config -from lib.config import env, Env, K8sContext -from lib.operators.spark import SparkOperator - -with DAG( - dag_id='etl_variant_prepared', - start_date=datetime(2022, 1, 1), - schedule_interval=None, - params={ - 'project': Param('cqdg', type='string'), - }, -) as dag: - - def project() -> str: - return '{{ params.project }}' - - def spark_config(): - return 'etl-task-xlarge' if env == Env.PROD else 'etl-task-large' - - variant_task_variant_centric = SparkOperator( - task_id='variant_task_variant_centric', - name='etl-variant_task_variant_centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.variant_task_jar, - spark_class='bio.ferlab.etl.prepared.RunPrepared', - spark_config=spark_config(), - arguments=['variant_centric', - '--config', f'config/{env}-{project()}.conf', - '--steps', 'default'], - ) - - variant_task_gene_centric = SparkOperator( - task_id='variant_task_gene_centric', - name='etl-variant_task_gene_centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.variant_task_jar, - spark_class='bio.ferlab.etl.prepared.RunPrepared', - spark_config=spark_config(), - arguments=['gene_centric', - '--config', f'config/{env}-{project()}.conf', - '--steps', 'default'], - ) - - variant_task_variant_suggestions = SparkOperator( - task_id='variant_task_variant_suggestions', - name='etl-variant_variant_suggestions', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.variant_task_jar, - spark_class='bio.ferlab.etl.prepared.RunPrepared', - spark_config=spark_config(), - arguments=['variant_suggestions', - '--config', f'config/{env}-{project()}.conf', - '--steps', 'default'], - ) - - variant_task_gene_suggestions = SparkOperator( - task_id='variant_task_gene_suggestions', - name='etl-variant_gene_suggestions', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.variant_task_jar, - spark_class='bio.ferlab.etl.prepared.RunPrepared', - spark_config=spark_config(), - arguments=['gene_suggestions', - '--config', f'config/{env}-{project()}.conf', - '--steps', 'default'], - ) - - variant_task_variant_centric >> variant_task_gene_centric >> variant_task_variant_suggestions >> variant_task_gene_suggestions diff --git a/dags/fhavro_export.py b/dags/fhavro_export.py deleted file mode 100644 index 8eee83c..0000000 --- a/dags/fhavro_export.py +++ /dev/null @@ -1,45 +0,0 @@ -from airflow import DAG -from airflow.models.param import Param -from airflow.utils.task_group import TaskGroup -from datetime import datetime -from lib import config -from lib.operators.fhavro import FhavroOperator -from lib.config import env, Env, K8sContext -from lib.operators.spark import SparkOperator -from lib.operators.arranger import ArrangerOperator - -#13 - -with DAG( - dag_id='fhavro_export', - start_date=datetime(2022, 1, 1), - schedule_interval=None, - params={ - 'release_id': Param('7', type='string'), - 'study_ids': Param('ST0000017', type='string'), - 'project': Param('cqdg', type='string'), - }, -) as dag: - def release_id() -> str: - return '{{ params.release_id }}' - - - def study_ids() -> str: - return '{{ params.study_ids }}' - - - def project() -> str: - return '{{ params.project }}' - - - fhavro_export = FhavroOperator( - task_id='fhavro_export', - name='etl-fhavro_export', - k8s_context=K8sContext.DEFAULT, - cmds=['java', - '-cp', - 'fhavro-export.jar', - 'bio/ferlab/fhir/etl/FhavroExport', - release_id(), study_ids(), env - ], - ) diff --git a/dags/import.py b/dags/import.py deleted file mode 100644 index 30b7e63..0000000 --- a/dags/import.py +++ /dev/null @@ -1,36 +0,0 @@ -from airflow import DAG -from airflow.models.param import Param -from datetime import datetime -from lib import config -from lib.config import env, Env, K8sContext -from lib.operators.spark import SparkOperator - -with DAG( - dag_id='import', - start_date=datetime(2022, 1, 1), - schedule_interval=None, - params={ - 'release_id': Param('7', type='string'), - 'study_ids': Param('ST0000017', type='string'), - 'project': Param('cqdg', type='string'), - }, -) as dag: - def release_id() -> str: - return '{{ params.release_id }}' - - - def study_ids() -> str: - return '{{ params.study_ids }}' - - def project() -> str: - return '{{ params.project }}' - - import_task = SparkOperator( - task_id='import_task', - name='etl-import-task', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_import_jar, - spark_class='bio.ferlab.fhir.etl.ImportTask', - spark_config='etl-task-medium', - arguments=[f'config/{env}-{project()}.conf', 'default', release_id(), study_ids()], - ) diff --git a/dags/index.py b/dags/index.py deleted file mode 100644 index 96de32a..0000000 --- a/dags/index.py +++ /dev/null @@ -1,72 +0,0 @@ -from airflow import DAG -from airflow.models.param import Param -from datetime import datetime -from lib import config -from lib.config import env, K8sContext -from lib.operators.spark import SparkOperator - -with DAG( - dag_id='index_task', - start_date=datetime(2022, 1, 1), - schedule_interval=None, - params={ - 'release_id': Param('7', type='string'), - 'study_ids': Param('ST0000017', type='string'), - 'project': Param('cqdg', type='string'), - 'es_port': Param('9200', type='string') - }, -) as dag: - def release_id() -> str: - return '{{ params.release_id }}' - - - def study_ids() -> str: - return '{{ params.study_ids }}' - - def project() -> str: - return '{{ params.project }}' - - def es_port() -> str: - return '{{ params.es_port }}' - - study_centric = SparkOperator( - task_id='study_centric', - name='etl-index-study-centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_index_jar, - spark_class='bio.ferlab.fhir.etl.IndexTask', - spark_config='etl-task-small', - arguments=[release_id(), study_ids(), 'study_centric', env, project(), config.es_url, es_port()], - ) - - participant_centric = SparkOperator( - task_id='participant_centric', - name='etl-index-participant-centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_index_jar, - spark_class='bio.ferlab.fhir.etl.IndexTask', - spark_config='etl-task-medium', - arguments=[release_id(), study_ids(), 'participant_centric', env, project(), config.es_url, es_port()], - ) - - file_centric = SparkOperator( - task_id='file_centric', - name='etl-index-file-centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_index_jar, - spark_class='bio.ferlab.fhir.etl.IndexTask', - spark_config='etl-task-medium', - arguments=[release_id(), study_ids(), 'file_centric', env, project(), config.es_url, es_port()], - ) - - biospecimen_centric = SparkOperator( - task_id='biospecimen_centric', - name='etl-index-biospecimen-centric', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_index_jar, - spark_class='bio.ferlab.fhir.etl.IndexTask', - spark_config='etl-task-medium', - arguments=[release_id(), study_ids(), 'biospecimen_centric', env, project(), config.es_url, es_port()], - ) - - participant_centric >> study_centric >> file_centric >> biospecimen_centric diff --git a/dags/lib/config.py b/dags/lib/config.py index 457b62a..e3fb7ff 100644 --- a/dags/lib/config.py +++ b/dags/lib/config.py @@ -1,104 +1,159 @@ import kubernetes from airflow.exceptions import AirflowConfigException from airflow.models import Variable, Param - +from lib.operators.spark import SparkOperatorConfig +from lib.operators.base_kubernetes import KubeConfig +from lib.operators.fhavro import FhavroConfig +from lib.operators.fhir_import import FhirCsvConfig class Env: QA = 'qa' DEV = 'dev' PROD = 'prod' +env = Variable.get('environment') -class K8sContext: - DEFAULT = 'default' - ETL = 'etl' +es_url = Variable.get('es_url') +es_port = Variable.get('es_port', '9200') +keycloak_url = Variable.get('keycloak_url') +fhir_url = Variable.get('fhir_url') +aws_secret_name = 'ceph-s3-credentials' +aws_secret_access_key = 'access' +aws_secret_secret_key = 'secret' -env = Variable.get('environment') -k8s_namespace = Variable.get('kubernetes_namespace') -k8s_context = { - K8sContext.DEFAULT: Variable.get('kubernetes_context_default', None), - K8sContext.ETL: Variable.get('kubernetes_context_etl', None), -} -base_url = Variable.get('base_url', None) -s3_conn_id = Variable.get('s3_conn_id', None) -show_test_dags = Variable.get('show_test_dags', None) == 'yes' - -fhavro_export_image = 'ferlabcrsj/fhavro-export:86852f678cf568d453d6600ce56735c76ef946d2-1708714675' -spark_image = 'ferlabcrsj/spark:6916df9ea76364939be282f32a5b2ddacdb3526e' -arranger_image = 'ferlabcrsj/cqdg-api-arranger:1.3.2' -cqdg_fhir_import = 'ferlabcrsj/cqdg-fhir-import' -obo_parser_jar_version = 'v1.0.9' -jar_version = 'v2.21.5' -spark_service_account = 'spark' +aws_endpoint = Variable.get('object_store_url') +hive_metastore_uri=Variable.get('hive_metastore_uri') + +datalake_bucket = Variable.get('datalake_bucket') +clinical_data_bucket = Variable.get('clinical_data_bucket') +file_import_bucket = Variable.get('file_import_bucket') + +keycloak_client_secret_name = 'keycloak-client-system-credentials' + +es_credentials_secret_name = 'opensearch-dags-credentials' +es_credentials_secret_key_username = 'username' +es_credentials_secret_key_password = 'password' default_params = { 'study_id': Param('ST0000017', type='string'), 'project': Param('cqdg', type='string'), } + study_id = '{{ params.study_id }}' +study_ids = '{{ params.study_ids }}' project = '{{ params.project }}' +project = '{{ params.project }}' +dataset ='{{ params.dataset }}' +batch = '{{ params.batch }}' +release_id ='{{ params.release_id }}' default_config_file = f'config/{env}-{project}.conf' -if env == Env.QA: - es_url = 'https://workers.search.qa.juno.cqdg.ferlab.bio' - spark_import_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/import-task.jar' - spark_prepare_index_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/prepare-index.jar' - spark_index_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/index-task.jar' - spark_publish_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/publish-task.jar' - variant_task_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/variant-task.jar' - fhir_url = 'http://fhir-server:8080/fhir' - keycloak_url = 'http://keycloak-http' - ca_certificates = 'ingress-ca-certificate' - minio_certificate = 'minio-ca-certificate' -elif env == Env.DEV: - es_url = 'https://workers.search.qa.juno.cqdg.ferlab.bio' - spark_import_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/import-task.jar' - spark_prepare_index_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/prepare-index.jar' - spark_index_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/index-task.jar' - spark_publish_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/publish-task.jar' - variant_task_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/variant-task.jar' - fhir_url = 'http://fhir-server:8080/fhir' - keycloak_url = 'http://keycloak-http' - ca_certificates = 'ingress-ca-certificate' - minio_certificate = 'minio-ca-certificate' -elif env == Env.PROD: - es_url = 'https://workers.search.prod.juno.cqdg.ferlab.bio' - spark_import_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/import-task.jar' - spark_prepare_index_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/prepare-index.jar' - spark_index_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/index-task.jar' - spark_publish_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/publish-task.jar' - variant_task_jar = f'https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/{jar_version}/variant-task.jar' - fhir_url = 'http://fhir-server:8080/fhir' - keycloak_url = 'http://keycloak-http' - ca_certificates = 'ingress-ca-certificate' - minio_certificate = 'minio-ca-certificate' -else: - raise AirflowConfigException(f'Unexpected environment "{env}"') - -obo_parser_jar = f'https://github.com/Ferlab-Ste-Justine/obo-parser/releases/download/{obo_parser_jar_version}/obo-parser.jar' - -def env_url(prefix: str = '') -> str: - return f'{prefix}{env}' if env in [Env.QA, Env.DEV] else '' - - -def k8s_in_cluster(context: str) -> bool: - return not k8s_context[context] - - -def k8s_config_file(context: str) -> str: - return None if not k8s_context[context] else '~/.kube/config' - - -def k8s_cluster_context(context: str) -> str: - return k8s_context[context] - - -def k8s_load_config(context: str) -> None: - if not k8s_context[context]: - kubernetes.config.load_incluster_config() - else: - kubernetes.config.load_kube_config( - config_file=k8s_config_file(context), - context=k8s_context[context], - ) +kube_config = KubeConfig( + in_cluster = Variable.get('k8s_in_cluster', True), + namespace = Variable.get('k8s_namespace', None), + service_account_name = Variable.get('k8s_service_account_name'), + image_pull_secrets_name = Variable.get('k8s_image_pull_secret_name', None), +) + +spark_default_conf = { + 'spark.jars.packages': 'org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-core_2.12:2.4.0', + 'spark.sql.shuffle.partitions' : '1000', + 'spark.sql.extensions' : 'io.delta.sql.DeltaSparkSessionExtension', + 'spark.sql.catalog.spark_catalog' : 'org.apache.spark.sql.delta.catalog.DeltaCatalog', + 'spark.hadoop.fs.s3a.impl' : 'org.apache.hadoop.fs.s3a.S3AFileSystem', + 'spark.hadoop.fs.s3a.fast.upload' : 'true', + 'spark.hadoop.fs.s3a.connection.ssl.enabled' : 'true', + 'spark.hadoop.fs.s3a.path.style.access' : 'true', + 'spark.hadoop.fs.s3a.endpoint' : aws_endpoint, + 'spark.hadoop.fs.s3a.aws.credentials.provider' : 'com.amazonaws.auth.EnvironmentVariableCredentialsProvider', + 'spark.kubernetes.driver.secretKeyRef.AWS_ACCESS_KEY_ID' : f'{aws_secret_name}:{aws_secret_access_key}', + 'spark.kubernetes.driver.secretKeyRef.AWS_SECRET_ACCESS_KEY' : f'{aws_secret_name}:{aws_secret_secret_key}', + 'spark.kubernetes.executor.secretKeyRef.AWS_ACCESS_KEY_ID' : f'{aws_secret_name}:{aws_secret_access_key}', + 'spark.kubernetes.executor.secretKeyRef.AWS_SECRET_ACCESS_KEY' : f'{aws_secret_name}:{aws_secret_secret_key}', + 'spark.hadoop.hive.metastore.uris' : hive_metastore_uri, + 'spark.sql.warehouse.dir' : f's3a://{datalake_bucket}/hive', + 'spark.eventLog.enabled' : 'true', + 'spark.eventLog.dir' : f's3a://{datalake_bucket}/spark-logs', + 'spark.driver.extraJavaOptions' : '"-Divy.cache.dir=/tmp -Divy.home=/tmp"', + 'spark.jars.ivy': '/tmp' +} + +spark_small_conf = { + 'spark.driver.memory' : '16g', + 'spark.driver.cores' : '6', + 'spark.executor.instances' : '1', + 'spark.executor.memory' : '16g', + 'spark.memory.fraction' : '0.9', + 'spark.memory.storageFraction' : '0.1', + 'spark.executor.cores' : '12', + 'spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-1.options.sizeLimit' : '50Gi', + 'spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-1.mount.path' : '/data', + 'spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-1.mount.readOnly' : 'false', +} + +spark_medium_conf = { + 'spark.driver.memory' : '16g', + 'spark.driver.cores' : '6', + 'spark.executor.instances' : '2', + 'spark.executor.memory' : '16g', + 'spark.memory.fraction' : '0.9', + 'spark.memory.storageFraction' : '0.1', + 'spark.executor.cores' : '12', + 'spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-1.options.sizeLimit' : '350Gi', + 'spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-1.mount.path' : '/data', + 'spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-1.mount.readOnly' : 'false', +} + +spark_large_conf = { + 'spark.driver.memory' : '70g', + 'spark.driver.cores' : '12', + 'spark.executor.instances' : '20', + 'spark.executor.memory' : '64g', + 'spark.memory.fraction' : '0.9', + 'spark.memory.storageFraction' : '0.1', + 'spark.executor.cores' : '12', + 'spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-1.options.sizeLimit' : '350Gi', + 'spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-1.mount.path' : '/data', + 'spark.kubernetes.executor.volumes.emptyDir.spark-local-dir-1.mount.readOnly' : 'false', +} +spark_index_conf = { + 'spark.kubernetes.driver.podTemplateFile': 'local:///app/pod-template-es-cert.yml', + 'spark.driver.extraJavaOptions': '-Divy.cache.dir=/tmp -Divy.home=/tmp -Djavax.net.ssl.trustStore=/opt/keystores/truststore.p12 -Djavax.net.ssl.trustStorePassword=changeit', + 'spark.kubernetes.driver.secretKeyRef.ES_USERNAME': f'{es_credentials_secret_name}:{es_credentials_secret_key_username}', + 'spark.kubernetes.driver.secretKeyRef.ES_PASSWORD': f'{es_credentials_secret_name}:{es_credentials_secret_key_password}', + 'spark.kubernetes.executor.podTemplateFile': 'local:///app/pod-template-es-cert.yml', + 'spark.executor.extraJavaOptions': '-Divy.cache.dir=/tmp -Divy.home=/tmp -Djavax.net.ssl.trustStore=/opt/keystores/truststore.p12 -Djavax.net.ssl.trustStorePassword=changeit', + 'spark.kubernetes.executor.secretKeyRef.ES_USERNAME': f'{es_credentials_secret_name}:{es_credentials_secret_key_username}', + 'spark.kubernetes.executor.secretKeyRef.ES_PASSWORD': f'{es_credentials_secret_name}:{es_credentials_secret_key_password}' +} + +variant_jar = 'local:///app/variant-task.jar' +prepare_index_jar = 'local:///app/prepare-index.jar' +import_jar = 'local:///app/import-task.jar' +index_jar = 'local:///app/index-task.jar' +publish_jar = 'local:///app/publish-task.jar' + +etl_base_config = SparkOperatorConfig( + spark_configs=[spark_default_conf], + image = Variable.get('etl_image'), + kube_config=kube_config, + is_delete_operator_pod=False +) + +etl_index_config = etl_base_config \ + .add_spark_conf(spark_small_conf, spark_index_conf) \ + .with_spark_jar(index_jar) + +etl_publish_config = etl_base_config \ + .add_spark_conf(spark_small_conf, spark_index_conf) \ + .with_spark_jar(publish_jar) \ + .with_spark_class('bio.ferlab.fhir.etl.PublishTask') + +etl_variant_config = etl_base_config \ + .add_spark_conf(spark_large_conf) \ + .with_spark_jar(variant_jar) + + + + diff --git a/dags/lib/operators/arranger.py b/dags/lib/operators/arranger.py index b7d6199..7da4c60 100644 --- a/dags/lib/operators/arranger.py +++ b/dags/lib/operators/arranger.py @@ -1,133 +1,132 @@ -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from kubernetes.client import models as k8s -from lib import config -from lib.config import env -import time +from typing import Optional, List, Type +from dataclasses import dataclass +from lib.operators.base_kubernetes import BaseKubernetesOperator, BaseConfig, required -from dags.lib.config import Env - - -class ArrangerOperator(KubernetesPodOperator): +class ArrangerOperator(BaseKubernetesOperator): + template_fields = [*BaseKubernetesOperator.template_fields, 'node_environment', 'es_url', 'spark_class', + 'es_port', 'es_cert_secret_name', 'es_credentials_secret_name', 'es_credentials_secret_key_username', 'es_credentials_secret_key_password', + 'keycloak_client_secret_name', 'keycloak_client_secret_key' + ] def __init__( self, - k8s_context: str, + node_environment: str, + es_url: str, + es_port: Optional[str] = '9200', + es_cert_secret_name: Optional[str] = None, + es_cert_file: Optional[str] = 'ca.crt', + es_credentials_secret_name: Optional[str] = None, + es_credentials_secret_key_username: Optional[str] = 'username', + es_credentials_secret_key_password: Optional[str] = 'password', + keycloak_client_secret_name: Optional[str] = None, + keycloak_client_secret_key: Optional[str] = 'client-secret', **kwargs, ) -> None: super().__init__( - is_delete_operator_pod=True, - in_cluster=config.k8s_in_cluster(k8s_context), - cluster_context=config.k8s_cluster_context(k8s_context), - namespace=config.k8s_namespace, - image=config.arranger_image, - **kwargs, + **kwargs ) - + self.node_environment=node_environment + self.es_url=es_url + self.es_port=es_port + self.es_cert_secret_name=es_cert_secret_name + self.es_cert_file=es_cert_file + self.es_credentials_secret_name=es_credentials_secret_name + self.es_credentials_secret_key_username=es_credentials_secret_key_username + self.es_credentials_secret_key_password=es_credentials_secret_key_password + self.keycloak_client_secret_name=keycloak_client_secret_name + self.keycloak_client_secret_key=keycloak_client_secret_key + def execute(self, **kwargs): - self.image_pull_secrets = [ - k8s.V1LocalObjectReference( - name='images-registry-credentials', - ), - ] - self.env_vars = [ - k8s.V1EnvVar( - name='ES_HOST', - value="{0}:9200".format(config.es_url), - ), - k8s.V1EnvVar( - name='NODE_EXTRA_CA_CERTS', - value='/opt/opensearch-ca/ca.crt', - ), + + self.env_vars.append( k8s.V1EnvVar( - name='ES_USER', - value_from=k8s.V1EnvVarSource( - secret_key_ref=k8s.V1SecretKeySelector( - name='opensearch-dags-credentials', - key='username', - ), - ), - ), + name='NODE_ENV', + value=self.node_environment, + ) + ) + self.env_vars.append( k8s.V1EnvVar( - name='ES_PASS', - value_from=k8s.V1EnvVarSource( - secret_key_ref=k8s.V1SecretKeySelector( - name='opensearch-dags-credentials', - key='password', - ), - ), - ), - ] - - self.volumes = [] - self.volume_mounts = [] + name='ES_HOST', + value=f"{self.es_url}:{self.es_port}", + ), + ) - if env in [Env.PROD]: + if self.es_credentials_secret_name: self.env_vars.append( - k8s.V1EnvVar( - name='NODE_ENV', - value='production', + k8s.V1EnvVar( + name='ES_USER', + value_from=k8s.V1EnvVarSource( + secret_key_ref = k8s.V1SecretKeySelector( + name=self.es_credentials_secret_name, + key=self.es_credentials_secret_key_username) + ) ) ) self.env_vars.append( k8s.V1EnvVar( - name='KEYCLOAK_CLIENT_SECRET', + name='ES_PASS', value_from=k8s.V1EnvVarSource( secret_key_ref=k8s.V1SecretKeySelector( - name='keycloak-client-system-credentials', - key='client-secret', - ), - ), - ) - ) - self.volumes.append( - k8s.V1Volume( - name='opensearch-ca-certificate', - secret=k8s.V1SecretVolumeSource( - secret_name='opensearch-ca-certificate', - default_mode=0o555 - ), - ), - ) - self.volume_mounts.append( - k8s.V1VolumeMount( - name='opensearch-ca-certificate', - mount_path='/opt/opensearch-ca', - read_only=True, - ), - ) - else: - self.env_vars.append( - k8s.V1EnvVar( - name='NODE_ENV', - value='qa', + name=self.es_credentials_secret_name, + key=self.es_credentials_secret_key_password) + ) ) ) + + if self.keycloak_client_secret_name: self.env_vars.append( k8s.V1EnvVar( name='KEYCLOAK_CLIENT_SECRET', value_from=k8s.V1EnvVarSource( secret_key_ref=k8s.V1SecretKeySelector( - name='keycloak-client-system-credentials', - key='client-secret', + name=self.keycloak_client_secret_name, + key=self.keycloak_client_secret_key, ), ), ) - ) + ) + + if self.es_cert_secret_name: self.volumes.append( k8s.V1Volume( - name='opensearch-ca-certificate', + name=self.es_cert_secret_name, secret=k8s.V1SecretVolumeSource( secret_name='opensearch-ca-certificate', default_mode=0o555 ), ), - ) + ) self.volume_mounts.append( k8s.V1VolumeMount( - name='opensearch-ca-certificate', + name=self.es_cert_secret_name, mount_path='/opt/opensearch-ca', read_only=True, ), - ) + ) + self.env_vars.append( + k8s.V1EnvVar( + name='NODE_EXTRA_CA_CERTS', + value=f'/opt/opensearch-ca/{self.es_cert_file}', + ) + ) + self.cmds = ['node'] super().execute(**kwargs) + + +@dataclass +class ArrangerConfig(BaseConfig): + node_environment: str = required() # we need a default value because BaseConfig has some default fields. See https://stackoverflow.com/questions/51575931/class-inheritance-in-python-3-7-dataclasses + es_url: Optional[str] = required() + es_port: Optional[str] = '9200' + es_cert_secret_name: Optional[str] = None + es_cert_file: Optional[str] = 'ca.crt' + es_credentials_secret_name: Optional[str] = None + es_credentials_secret_key_username: Optional[str] = 'username' + es_credentials_secret_key_password: Optional[str] = 'password' + keycloak_client_secret_name: Optional[str] = None + keycloak_client_secret_key: Optional[str] = 'client-secret' + + def operator(self, class_to_instantiate: Type[ArrangerOperator] = ArrangerOperator,**kwargs) -> ArrangerOperator: + return super().build_operator(class_to_instantiate=class_to_instantiate, **kwargs) \ No newline at end of file diff --git a/dags/lib/operators/base_kubernetes.py b/dags/lib/operators/base_kubernetes.py new file mode 100644 index 0000000..8b98f95 --- /dev/null +++ b/dags/lib/operators/base_kubernetes.py @@ -0,0 +1,84 @@ +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator +from typing import Optional, List, Type, TypeVar +from typing_extensions import Self +from kubernetes.client import models as k8s +import copy +from dataclasses import dataclass, field, asdict +@dataclass +class KubeConfig: + in_cluster: bool = True, + cluster_context: Optional[str] = None + namespace: Optional[str] = None + service_account_name: Optional[str] = None + image_pull_secrets_name: Optional[str] = None + + + +class BaseKubernetesOperator(KubernetesPodOperator): + template_fields = [*KubernetesPodOperator.template_fields, 'image_pull_secrets_name'] + def __init__( + self, + image_pull_secrets_name: Optional[str] = None, + **kwargs + ) -> None: + super().__init__( + **kwargs + ) + self.image_pull_secrets_name=image_pull_secrets_name + + def execute(self, **kwargs): + + if self.image_pull_secrets_name: + self.image_pull_secrets = [ + k8s.V1LocalObjectReference( + name = self.image_pull_secrets_name, + ), + ] + super().execute(**kwargs) + +T = TypeVar("T") + +def required() -> T: + f: T + + def factory() -> T: + # mypy treats a Field as a T, even though it has attributes like .name, .default, etc + field_name = f.name # type: ignore[attr-defined] + raise ValueError(f"field '{field_name}' required") + + f = field(default_factory=factory) + return f + +@dataclass +class BaseConfig: + kube_config: KubeConfig + is_delete_operator_pod: bool = True + image: Optional[str] = None + arguments: List[str] = field(default_factory=list) + + def args(self, *new_args) -> Self: + c = copy.copy(self) + c.arguments = [*self.arguments, *new_args] + return c + + def prepend_args(self, *new_args) -> Self: + c = copy.copy(self) + c.arguments = [*new_args, *self.arguments] + return c + + def build_operator(self, class_to_instantiate: Type[BaseKubernetesOperator], **kwargs) -> BaseKubernetesOperator: + this_params = asdict(self) + this_params.pop('kube_config', None) + params = {**this_params, **kwargs} + return class_to_instantiate( + in_cluster = self.kube_config.in_cluster, + cluster_context = self.kube_config.cluster_context, + namespace = self.kube_config.namespace, + service_account_name = self.kube_config.service_account_name, + **params + ) + + def with_image(self, new_image) -> Self: + c = copy.copy(self) + c.image = new_image + return c \ No newline at end of file diff --git a/dags/lib/operators/fhavro.py b/dags/lib/operators/fhavro.py index 3349b55..c06516d 100644 --- a/dags/lib/operators/fhavro.py +++ b/dags/lib/operators/fhavro.py @@ -1,82 +1,128 @@ -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator +from dataclasses import dataclass +from lib.operators.base_kubernetes import BaseKubernetesOperator, BaseConfig, required from kubernetes.client import models as k8s -from lib import config -from lib.config import env +from typing import Optional, Type -class FhavroOperator(KubernetesPodOperator): +class FhavroOperator(BaseKubernetesOperator): def __init__( self, - k8s_context: str, + fhir_url: str, + keycloak_url: str, + keycloak_client_secret_name: str, + bucket_name:str, + aws_endpoint:Optional[str] = None, + aws_credentials_secret_name: Optional[str] = None, + aws_credentials_secret_access_key: str = 'access', + aws_credentials_secret_secret_key: str = 'secret', + aws_region: str = 'us-east-1', + aws_access_path_style: bool = True, + keycloak_client_secret_key: Optional[str] = 'client-secret', **kwargs, ) -> None: super().__init__( - is_delete_operator_pod=True, - in_cluster=config.k8s_in_cluster(k8s_context), - cluster_context=config.k8s_cluster_context(k8s_context), - namespace=config.k8s_namespace, - image=config.fhavro_export_image, - **kwargs, + **kwargs ) + self.fhir_url = fhir_url + self.keycloak_url = keycloak_url + self.bucket_name = bucket_name + self.aws_access_path_style = aws_access_path_style + self.aws_endpoint=aws_endpoint + self.aws_credentials_secret_name = aws_credentials_secret_name + self.aws_credentials_secret_access_key = aws_credentials_secret_access_key + self.aws_credentials_secret_secret_key = aws_credentials_secret_secret_key + self.aws_region = aws_region + self.keycloak_client_secret_name = keycloak_client_secret_name + self.keycloak_client_secret_key = keycloak_client_secret_key + + def execute(self, **kwargs): - self.image_pull_secrets = [ - k8s.V1LocalObjectReference( - name='images-registry-credentials', - ), - ] - self.env_vars = [ - k8s.V1EnvVar( - name='AWS_ENDPOINT', - value='https://objets.juno.calculquebec.ca', - ), - k8s.V1EnvVar( - name='AWS_ACCESS_KEY_ID', - value_from=k8s.V1EnvVarSource( - secret_key_ref=k8s.V1SecretKeySelector( - name='ceph-s3-credentials', - key='access', + if self.aws_endpoint: + self.env_vars.append( + k8s.V1EnvVar( + name='AWS_ENDPOINT', + value=self.aws_endpoint, + ) + ) + if self.aws_credentials_secret_name: + self.env_vars.append( + k8s.V1EnvVar( + name='AWS_ACCESS_KEY_ID', + value_from=k8s.V1EnvVarSource( + secret_key_ref=k8s.V1SecretKeySelector( + name=self.aws_credentials_secret_name, + key=self.aws_credentials_secret_access_key, + ), ), ), - ), - k8s.V1EnvVar( - name='AWS_SECRET_ACCESS_KEY', - value_from=k8s.V1EnvVarSource( - secret_key_ref=k8s.V1SecretKeySelector( - name='ceph-s3-credentials', - key='secret', + ) + self.env_vars.append( + k8s.V1EnvVar( + name='AWS_SECRET_ACCESS_KEY', + value_from=k8s.V1EnvVarSource( + secret_key_ref=k8s.V1SecretKeySelector( + name=self.aws_credentials_secret_name, + key=self.aws_credentials_secret_secret_key, + ), ), - ), - ), + ) + ) + + env_vars = [ k8s.V1EnvVar( name='AWS_REGION', - value='us-east-1', + value=self.aws_region, ), k8s.V1EnvVar( name='KEYCLOAK_CLIENT_SECRET', value_from=k8s.V1EnvVarSource( secret_key_ref=k8s.V1SecretKeySelector( - name='keycloak-client-system-credentials', - key='client-secret', + name=self.keycloak_client_secret_name, + key=self.keycloak_client_secret_key, ), ), ), k8s.V1EnvVar( name='KEYCLOAK_URL', - value=config.keycloak_url, + value=self.keycloak_url, ), k8s.V1EnvVar( name='FHIR_URL', - value=config.fhir_url, + value=self.fhir_url, ), k8s.V1EnvVar( - name='AWS_PATH_ACCESS_STYLE', - value='true' + name = 'AWS_PATH_ACCESS_STYLE', + value = 'true' if self.aws_access_path_style else 'false' ), k8s.V1EnvVar( name='BUCKET_NAME', - value=f'cqdg-{env}-app-datalake', - ), + value=self.bucket_name, + ) ] + self.env_vars = [*self.env_vars, *env_vars] + self.cmds=['java', + '-cp', + 'fhavro-export.jar', + 'bio/ferlab/fhir/etl/FhavroExport' + ] super().execute(**kwargs) + +@dataclass +class FhavroConfig(BaseConfig): + fhir_url: str = required() + keycloak_url: str = required() + keycloak_client_secret_name: str = required() + bucket_name:str = required() + aws_endpoint:Optional[str] = None + aws_credentials_secret_name: Optional[str] = None + aws_credentials_secret_access_key: str = 'access' + aws_credentials_secret_secret_key: str = 'secret' + aws_region: str = 'us-east-1' + aws_access_path_style: bool = True + keycloak_client_secret_key: Optional[str] = 'client-secret' + + def operator(self, class_to_instantiate: Type[FhavroOperator] = FhavroOperator,**kwargs) -> FhavroOperator: + return super().build_operator(class_to_instantiate=class_to_instantiate, **kwargs) + diff --git a/dags/lib/operators/fhir.py b/dags/lib/operators/fhir.py deleted file mode 100644 index 1381e31..0000000 --- a/dags/lib/operators/fhir.py +++ /dev/null @@ -1,47 +0,0 @@ -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator -from kubernetes.client import models as k8s -from lib import config - - -class FhirOperator(KubernetesPodOperator): - def __init__( - self, - k8s_context: str, - color: str = '', - **kwargs, - ) -> None: - super().__init__( - is_delete_operator_pod=True, - in_cluster=config.k8s_in_cluster(k8s_context), - config_file=config.k8s_config_file(k8s_context), - cluster_context=config.k8s_cluster_context(k8s_context), - namespace=config.k8s_namespace, - image=config.cqdg_fhir_import, - **kwargs, - ) - self.color = color - - def execute(self, **kwargs): - - self.image_pull_secrets = [ - k8s.V1LocalObjectReference( - name='images-registry-credentials', - ), - ] - self.env_vars = [ - k8s.V1EnvVar( - name='FHIR_URL', - value='http://fhir-server:8080/fhir', - ), - k8s.V1EnvVar( - name='KEYCLOAK_CLIENT_SECRET', - value_from=k8s.V1EnvVarSource( - secret_key_ref=k8s.V1SecretKeySelector( - name='keycloak-client-system-credentials', - key='client-secret', - ), - ), - ), - ] - - super().execute(**kwargs) diff --git a/dags/lib/operators/fhir_import.py b/dags/lib/operators/fhir_import.py index 35e2d2f..cdbc41b 100644 --- a/dags/lib/operators/fhir_import.py +++ b/dags/lib/operators/fhir_import.py @@ -1,86 +1,123 @@ -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator +from dataclasses import dataclass +from lib.operators.base_kubernetes import BaseKubernetesOperator, BaseConfig, required from kubernetes.client import models as k8s -from lib import config -from lib.config import env +from typing import Optional, Type - -class FhirCsvOperator(KubernetesPodOperator): - - template_fields = KubernetesPodOperator.template_fields +class FhirCsvOperator(BaseKubernetesOperator): def __init__( - self, - k8s_context: str, - **kwargs, - ) -> None: - super().__init__( - is_delete_operator_pod=True, - in_cluster=config.k8s_in_cluster(k8s_context), - cluster_context=config.k8s_cluster_context(k8s_context), - namespace=config.k8s_namespace, - image=config.cqdg_fhir_import, + self, + fhir_url: str, + keycloak_url: str, + id_service_url: str, + keycloak_client_secret_name: str, + clinical_data_bucket_name:str, + file_import_bucket_name:str, + aws_endpoint:Optional[str] = None, + aws_credentials_secret_name: Optional[str] = None, + aws_credentials_secret_access_key: str = 'access', + aws_credentials_secret_secret_key: str = 'secret', + aws_access_path_style: bool = True, + keycloak_client_secret_key: Optional[str] = 'client-secret', **kwargs, - ) + ) -> None: + super().__init__(**kwargs) + self.fhir_url = fhir_url + self.keycloak_url = keycloak_url + self.id_service_url = id_service_url + self.clinical_data_bucket_name = clinical_data_bucket_name + self.file_import_bucket_name = file_import_bucket_name + self.aws_access_path_style = aws_access_path_style + self.aws_endpoint=aws_endpoint + self.aws_credentials_secret_name = aws_credentials_secret_name + self.aws_credentials_secret_access_key = aws_credentials_secret_access_key + self.aws_credentials_secret_secret_key = aws_credentials_secret_secret_key + self.keycloak_client_secret_name = keycloak_client_secret_name + self.keycloak_client_secret_key = keycloak_client_secret_key + def execute(self, **kwargs): - self.image_pull_secrets = [ - k8s.V1LocalObjectReference( - name='images-registry-credentials', - ), - ] - self.env_vars = [ - k8s.V1EnvVar( - name='AWS_ACCESS_KEY', - value_from=k8s.V1EnvVarSource( - secret_key_ref=k8s.V1SecretKeySelector( - name='ceph-s3-credentials', - key='access', + if self.config.aws_endpoint: + self.env_vars.append( + k8s.V1EnvVar( + name='AWS_ENDPOINT', + value=self.config.aws_endpoint, + ) + ) + if self.config.aws_credentials_secret_name: + self.env_vars.append( + k8s.V1EnvVar( + name='AWS_ACCESS_KEY_ID', + value_from=k8s.V1EnvVarSource( + secret_key_ref=k8s.V1SecretKeySelector( + name=self.config.aws_credentials_secret_name, + key=self.config.aws_credentials_secret_access_key, + ), ), ), - ), - k8s.V1EnvVar( - name='AWS_SECRET_KEY', - value_from=k8s.V1EnvVarSource( - secret_key_ref=k8s.V1SecretKeySelector( - name='ceph-s3-credentials', - key='secret', + ) + self.env_vars.append( + k8s.V1EnvVar( + name='AWS_SECRET_ACCESS_KEY', + value_from=k8s.V1EnvVarSource( + secret_key_ref=k8s.V1SecretKeySelector( + name=self.config.aws_credentials_secret_name, + key=self.config.aws_credentials_secret_secret_key, + ), ), - ), - ), + ) + ) + self.env_vars = [ + *self.env_vars, k8s.V1EnvVar( name='S3_CLINICAL_DATA_BUCKET_NAME', - value=f'cqdg-{env}-app-clinical-data-service', - ), - k8s.V1EnvVar( - name='AWS_ENDPOINT', - value='https://objets.juno.calculquebec.ca', + value=self.config.clinical_data_bucket_name, ), + k8s.V1EnvVar( name='FHIR_URL', - value='http://fhir-server:8080/fhir', + value=self.config.fhir_url, ), k8s.V1EnvVar( name='ID_SERVICE_HOST', - value='http://id-service:5000', + value=self.config.id_service_url, ), k8s.V1EnvVar( name='S3_FILE_IMPORT_BUCKET', - value=f'cqdg-{env}-file-import', + value=self.config.file_import_bucket_name, ), k8s.V1EnvVar( name='KEYCLOAK_CLIENT_SECRET', value_from=k8s.V1EnvVarSource( secret_key_ref=k8s.V1SecretKeySelector( - name='keycloak-client-system-credentials', - key='client-secret', + name=self.config.keycloak_client_secret_name, + key=self.config.keycloak_client_secret_key, ), ), ), k8s.V1EnvVar( name='KEYCLOAK_URL', - value='http://keycloak-http', + value=self.config.keycloak_url, ), ] - self.cmds = ['java'] + self.cmds = ['java', '-cp', 'cqdg-fhir-import.jar'] super().execute(**kwargs) + +@dataclass +class FhirCsvConfig(BaseConfig): + fhir_url: str = required() + keycloak_url: str = required() + id_service_url: str = required() + keycloak_client_secret_name: str = required() + clinical_data_bucket_name:str = required() + file_import_bucket_name:str = required() + aws_endpoint:Optional[str] = None + aws_credentials_secret_name: Optional[str] = None + aws_credentials_secret_access_key: str = 'access' + aws_credentials_secret_secret_key: str = 'secret' + aws_access_path_style: bool = True + keycloak_client_secret_key: Optional[str] = 'client-secret' + + def operator(self, class_to_instantiate: Type[FhirCsvOperator] = FhirCsvOperator,**kwargs) -> FhirCsvOperator: + return super().build_operator(class_to_instantiate=class_to_instantiate, **kwargs) \ No newline at end of file diff --git a/dags/lib/operators/spark.py b/dags/lib/operators/spark.py index 46633d1..f08b046 100644 --- a/dags/lib/operators/spark.py +++ b/dags/lib/operators/spark.py @@ -1,173 +1,112 @@ -import time - import kubernetes import logging from airflow.exceptions import AirflowFailException from airflow.exceptions import AirflowSkipException -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from kubernetes.client import models as k8s -from lib import config -from lib.config import env -from typing import List - -from dags.lib.config import Env - - -class SparkOperator(KubernetesPodOperator): - +from typing import Optional, List, Type +from collections import ChainMap +import copy +from lib.operators.base_kubernetes import BaseKubernetesOperator, BaseConfig +from typing_extensions import Self +from dataclasses import dataclass, field + +class SparkOperator(BaseKubernetesOperator): + template_fields = [*BaseKubernetesOperator.template_fields, 'spark_jar', 'spark_class', 'spark_configs', 'spark_packages'] def __init__( self, - k8s_context: str, - spark_jar: str, - spark_class: str, - spark_config: str = '', - spark_secret: str = '', - skip_env: List[str] = [], - skip_fail_env: List[str] = [], - **kwargs, + spark_class: Optional[str] = None, + spark_jar: Optional[str] = None, + spark_configs: List[dict] = [], + spark_config_volume: Optional[str] = None, + spark_packages: List[str] = [], + is_skip: bool = False, + is_skip_fail: bool = False, + + **kwargs ) -> None: - super().__init__( - is_delete_operator_pod=True, - in_cluster=config.k8s_in_cluster(k8s_context), - cluster_context=config.k8s_cluster_context(k8s_context), - namespace=config.k8s_namespace, - service_account_name=config.spark_service_account, - image=config.spark_image, - **kwargs, + super().__init__( + **kwargs ) - self.k8s_context = k8s_context self.spark_class = spark_class self.spark_jar = spark_jar - self.spark_config = spark_config - self.spark_secret = spark_secret - self.skip_env = skip_env - self.skip_fail_env = skip_fail_env + self.spark_configs = spark_configs + self.spark_config_volume = spark_config_volume + self.spark_packages = spark_packages + self.is_skip = is_skip + self.is_skip_fail = is_skip_fail + def execute(self, **kwargs): - if env in self.skip_env: + + if self.is_skip: raise AirflowSkipException() - self.cmds = ['/opt/client-entrypoint.sh'] - self.image_pull_policy = 'Always' - self.image_pull_secrets = [ - k8s.V1LocalObjectReference( - name='images-registry-credentials', - ), - ] - self.env_vars = [ + # Driver pod name + self.env_vars.append( k8s.V1EnvVar( name='SPARK_CLIENT_POD_NAME', value_from=k8s.V1EnvVarSource( field_ref=k8s.V1ObjectFieldSelector( field_path='metadata.name', ), - ), - ), - k8s.V1EnvVar( - name='AWS_ENDPOINT', - value='https://objets.juno.calculquebec.ca', - ), - k8s.V1EnvVar( - name='SPARK_JAR', - value=self.spark_jar, - ), - k8s.V1EnvVar( - name='SPARK_CLASS', - value=self.spark_class, - ), - k8s.V1EnvVar( - name='ES_USERNAME', - value_from=k8s.V1EnvVarSource( - secret_key_ref=k8s.V1SecretKeySelector( - name='opensearch-dags-credentials', - key='username', - ), - ), - ), + ) + ) + ) + self.env_vars.append( k8s.V1EnvVar( - name='ES_PASSWORD', + name='SPARK_CLIENT_NAMESPACE', value_from=k8s.V1EnvVarSource( - secret_key_ref=k8s.V1SecretKeySelector( - name='opensearch-dags-credentials', - key='password', + field_ref=k8s.V1ObjectFieldSelector( + field_path='metadata.namespace', ), - ), - ), - ] - self.volumes = [ - k8s.V1Volume( - name='spark-defaults', - config_map=k8s.V1ConfigMapVolumeSource( - name='spark-defaults', - ), - ), - k8s.V1Volume( - name='spark-s3-credentials', - secret=k8s.V1SecretVolumeSource( - secret_name='spark-s3-credentials', - ), - ), - ] - - self.volumes.append( - k8s.V1Volume( - name='opensearch-ca-certificate', - secret=k8s.V1SecretVolumeSource( - secret_name='opensearch-ca-certificate', - ), - ), - ) - - self.volume_mounts = [ - k8s.V1VolumeMount( - name='spark-defaults', - mount_path='/opt/spark-configs/defaults', - read_only=True, - ), - k8s.V1VolumeMount( - name='spark-s3-credentials', - mount_path='/opt/spark-configs/s3-credentials', - read_only=True, - ), - ] - - self.volume_mounts.append( - k8s.V1VolumeMount( - name='opensearch-ca-certificate', - mount_path='/opt/es-ca', - read_only=True, - ), + ) + ) ) - - if self.spark_config: + + driver_pod_name_config = ['--conf','spark.kubernetes.driver.pod.name=$(SPARK_CLIENT_POD_NAME)-driver'] + + # Build --conf attributes + spark_config_reversed = reversed(self.spark_configs) + merged_config = dict(ChainMap(*spark_config_reversed)) + + if 'spark.kubernetes.driver.container.image' not in merged_config.keys() and 'spark.kubernetes.container.image' not in merged_config.keys(): + merged_config['spark.kubernetes.driver.container.image']= self.image + if 'spark.kubernetes.executor.container.image' not in merged_config.keys() and 'spark.kubernetes.container.image' not in merged_config.keys(): + merged_config['spark.kubernetes.executor.container.image'] = self.image + if 'spark.master' not in merged_config.keys(): + merged_config['spark.master'] = 'k8s://https://kubernetes.default.svc' + if 'spark.kubernetes.namespace' not in merged_config.keys(): + merged_config['spark.kubernetes.namespace'] = '$(SPARK_CLIENT_NAMESPACE)' + if 'spark.kubernetes.authenticate.driver.serviceAccountName' not in merged_config.keys(): + merged_config['spark.kubernetes.authenticate.driver.serviceAccountName'] = self.service_account_name + + merged_config['spark.submit.deployMode'] = 'cluster' + + merged_config_attributes = [['--conf', f'{k}={v}'] for k,v in merged_config.items()] + merged_config_attributes = sum(merged_config_attributes, []) # flatten + + # Build --packages attribute + spark_packages_attributes = ['--packages', ','.join(self.spark_packages)] if self.spark_packages else [] + + # CMD + self.cmds = ['/opt/spark/bin/spark-submit'] + + self.arguments = [*spark_packages_attributes, *driver_pod_name_config, *merged_config_attributes, '--class', self.spark_class, self.spark_jar, *self.arguments] + + # Mount additional config volume + if self.spark_config_volume: self.volumes.append( k8s.V1Volume( - name=self.spark_config, + name='spark_config_volume', config_map=k8s.V1ConfigMapVolumeSource( - name=self.spark_config, - ), - ), - ) - self.volume_mounts.append( - k8s.V1VolumeMount( - name=self.spark_config, - mount_path=f'/opt/spark-configs/{self.spark_config}', - read_only=True, - ), - ) - if self.spark_secret: - self.volumes.append( - k8s.V1Volume( - name=self.spark_secret, - secret=k8s.V1SecretVolumeSource( - secret_name=self.spark_secret, + name=self.spark_config_volume, ), ), ) self.volume_mounts.append( k8s.V1VolumeMount( - name=self.spark_secret, - mount_path=f'/opt/spark-configs/{self.spark_secret}', + name='spark_config_volume', + mount_path=f'/opt/spark/conf', read_only=True, ), ) @@ -208,7 +147,71 @@ def execute(self, **kwargs): # Fail task if driver pod failed if driver_pod.items[0].status.phase != 'Succeeded': - if env in self.skip_fail_env: + if self.is_skip_fail: raise AirflowSkipException() else: raise AirflowFailException('Spark job failed') + +@dataclass +class SparkOperatorConfig(BaseConfig): + spark_class: Optional[str] = None + spark_jar: Optional[str] = None + spark_configs: List[dict] = field(default_factory=list) + spark_config_volume: Optional[str] = None + spark_packages: List[str] = field(default_factory=list) + is_skip: bool = False + is_skip_fail: bool = False + + def add_spark_conf(self, *new_config) -> Self: + c = copy.copy(self) + c.spark_configs = [*self.spark_configs, *new_config] + return c + + def add_package(self, new_package: str) -> Self: + c = copy.copy(self) + c.spark_packages = [*self.spark_packages, new_package] + return c + + def skip(self) -> Self: + c = copy.copy(self) + c.is_skip = True + return c + + def skip_fail(self) -> Self: + c = copy.copy(self) + c.is_skip_fail = True + return c + + def skip_all(self) -> Self: + c = copy.copy(self) + c.is_skip = True + c.is_skip_fail = True + return c + + def delete(self) -> Self: + c = copy.copy(self) + c.is_delete = True + return c + + def with_spark_class(self, spark_class: str) -> Self: + c = copy.copy(self) + c.spark_class = spark_class + return c + + def with_spark_jar(self, spark_jar:str) -> Self: + c = copy.copy(self) + c.spark_jar = spark_jar + return c + + def with_image(self, image: str) -> Self: + c = copy.copy(self) + c.image = image + return c + + def with_spark_config_volume(self, spark_config_volume: str) -> Self: + c = copy.copy(self) + c.spark_config_volume = spark_config_volume + return c + + def operator(self, class_to_instantiate: Type[SparkOperator] = SparkOperator,**kwargs) -> SparkOperator: + return super().build_operator(class_to_instantiate=class_to_instantiate, **kwargs) \ No newline at end of file diff --git a/dags/obo_parser.py b/dags/obo_parser.py index 35a6ebd..43be540 100644 --- a/dags/obo_parser.py +++ b/dags/obo_parser.py @@ -1,17 +1,16 @@ from airflow import DAG -from airflow.models.param import Param +from airflow.models import Variable, Param from datetime import datetime -from lib import config -from lib.config import env, Env, K8sContext +from lib.config import datalake_bucket, etl_base_config, spark_small_conf from lib.operators.spark import SparkOperator with DAG( - dag_id='obo_parser', + dag_id='obo-parser', start_date=datetime(2022, 1, 1), schedule_interval=None, params={ 'obo_url': Param('https://raw.githubusercontent.com/obophenotype/human-phenotype-ontology/master/hp.obo', type='string'), - 'ontology': Param("hpo_terms", type='string'), + 'ontology': Param('hpo_terms', type='string'), 'is_icd': Param(False, type='boolean'), 'required_top_node': Param("", type='string'), }, @@ -27,12 +26,13 @@ def is_icd() -> str: def required_top_node() -> str: return '{{ params.required_top_node }}' - import_task = SparkOperator( - task_id='obo_parser_task', - name='obo_parser-task', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.obo_parser_jar, - spark_class='bio.ferlab.HPOMain', - spark_config='etl-task-small', - arguments=[obo_url(), f'cqdg-{env}-app-datalake', ontology(), is_icd(), required_top_node()], - ) + import_task = etl_base_config \ + .with_image(Variable.get('obo_parser_image')) \ + .add_spark_conf(spark_small_conf) \ + .with_spark_jar('local:///app/obo-parser.jar') \ + .with_spark_class('bio.ferlab.HPOMain') \ + .args(obo_url(), datalake_bucket, ontology(), is_icd(), required_top_node()) \ + .operator( + task_id='obo_parser_task', + name='obo_parser-task' + ) diff --git a/dags/prepare_index.py b/dags/prepare_index.py deleted file mode 100644 index a945921..0000000 --- a/dags/prepare_index.py +++ /dev/null @@ -1,36 +0,0 @@ -from airflow import DAG -from airflow.models.param import Param -from datetime import datetime -from lib import config -from lib.config import env, K8sContext -from lib.operators.spark import SparkOperator - -with DAG( - dag_id='prepare_index', - start_date=datetime(2022, 1, 1), - schedule_interval=None, - params={ - 'release_id': Param('7', type='string'), - 'study_ids': Param('ST0000017', type='string'), - 'project': Param('cqdg', type='string'), - }, -) as dag: - def release_id() -> str: - return '{{ params.release_id }}' - - - def study_ids() -> str: - return '{{ params.study_ids }}' - - def project() -> str: - return '{{ params.project }}' - - prepare_index = SparkOperator( - task_id='prepare_index', - name='etl-prepare-index', - k8s_context=K8sContext.DEFAULT, - spark_jar=config.spark_prepare_index_jar, - spark_class='bio.ferlab.fhir.etl.PrepareIndex', - spark_config='etl-task-medium', - arguments=[f'config/{env}-{project()}.conf', 'default', 'all', release_id(), study_ids()], - ) diff --git a/dags/test_dag.py b/dags/test_dag.py deleted file mode 100644 index 53d5b18..0000000 --- a/dags/test_dag.py +++ /dev/null @@ -1,21 +0,0 @@ -from airflow import DAG -from airflow.models.param import Param -from datetime import datetime -from lib import config -from lib.config import env, Env, K8sContext -from lib.operators.spark import SparkOperator -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator -from airflow.models import Variable -with DAG( - dag_id='test_dag', - start_date=datetime(2022, 1, 1), - schedule_interval=None -) as dag: - run_this = KubernetesPodOperator( - task_id="test_error_message", - image=Variable.get("hello"), - cmds=["/bin/sh"], - arguments=["-c", "echo : {{ var.value.hello}}"], - name="test-config-map" -) -