From bcad8fc1c3548c77001acf80ea05cf6661d4f939 Mon Sep 17 00:00:00 2001 From: Adrian Date: Mon, 19 Aug 2024 13:29:55 -0400 Subject: [PATCH 1/3] feat: CDDG-719 etl variant and variant publish --- dags/etl-variant.py | 44 ++++++++++++++++++++ dags/etl_enrich_specimens.py | 33 +++++++-------- dags/etl_enrich_variants.py | 25 ++++++------ dags/etl_index_variants.py | 65 +++++++++++++++++------------- dags/etl_prepare_index_variants.py | 29 ++++--------- dags/etl_publish_variants.py | 40 ++++++++++++++++++ 6 files changed, 158 insertions(+), 78 deletions(-) create mode 100644 dags/etl-variant.py create mode 100644 dags/etl_publish_variants.py diff --git a/dags/etl-variant.py b/dags/etl-variant.py new file mode 100644 index 0000000..fef70f7 --- /dev/null +++ b/dags/etl-variant.py @@ -0,0 +1,44 @@ +from datetime import datetime + +from airflow import DAG +from airflow.models.param import Param +from airflow.utils.task_group import TaskGroup + +from es_templates_update import es_templates_update +from etl_enrich_specimens import etl_enrich_specimens +from etl_enrich_variants import variant_task_enrich_variants, variant_task_enrich_consequences +from etl_index import index_operator +from etl_index_variants import index_variants +from etl_normalize_variants import normalize_variant_operator +from etl_prepare_index_variants import etl_variant_prepared +from etl_publish import publish_task + +with DAG( + dag_id='etl-variant', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'study_code': Param('CAG', type='string'), #FIXME study Codes vs study code !!! + 'owner': Param('jmichaud', type='string'), + 'dataset': Param('dataset_default', type='string'), + 'batch': Param('annotated_vcf', type='string'), + 'release_id': Param('7', type='string'), + 'study_codes': Param('CAG', type='string'), + 'project': Param('cqdg', type='string'), + 'es_port': Param('9200', type='string') + }, +) as dag: + + with TaskGroup(group_id='normalize') as normalize: + normalize_variant_operator('snv') >> normalize_variant_operator('consequences') + + with TaskGroup(group_id='enrich') as enrich: + variant_task_enrich_variants() >> variant_task_enrich_consequences() + + with TaskGroup(group_id='prepared') as prepared: + etl_variant_prepared('variant_centric') >> etl_variant_prepared('gene_centric') >> etl_variant_prepared('variant_suggestions') >> etl_variant_prepared('gene_suggestions') + + with TaskGroup(group_id='index') as index: + index_variants() + + etl_enrich_specimens() >> normalize >> enrich >> prepared >> es_templates_update() >> index >> publish_task('variant_centric,variant_suggestions,gene_centric,gene_suggestions') diff --git a/dags/etl_enrich_specimens.py b/dags/etl_enrich_specimens.py index 8ed87d7..7ccdff6 100644 --- a/dags/etl_enrich_specimens.py +++ b/dags/etl_enrich_specimens.py @@ -1,9 +1,24 @@ from airflow import DAG from airflow.models.param import Param from datetime import datetime -from lib.config import default_config_file, study_codes +from lib.config import default_config_file, study_code, study_codes from etl_prepare_index import etl_base_config, spark_small_conf, prepare_index_jar +def etl_enrich_specimens(): + return etl_base_config \ + .add_spark_conf(spark_small_conf) \ + .with_spark_jar(prepare_index_jar) \ + .with_spark_class('bio.ferlab.fhir.etl.Enrich') \ + .args( + '--config', default_config_file, + '--steps', 'default', + '--app-name', 'enrich_specimen', + '--study-id', study_code or study_codes + ).operator( + task_id='enrich-specimen', + name='etl-enrich-specimen' + ) + with DAG( dag_id='etl-enrich-specimen', start_date=datetime(2022, 1, 1), @@ -13,19 +28,5 @@ 'project': Param('cqdg', type='string'), }, ) as dag: - - etl_base_config \ - .add_spark_conf(spark_small_conf) \ - .with_spark_jar(prepare_index_jar) \ - .with_spark_class('bio.ferlab.fhir.etl.Enrich') \ - .args( - '--config', default_config_file, - '--steps', 'default', - '--app-name', 'enrich_specimen', - '--study-id', study_codes - ).operator( - task_id='enrich-specimen', - name='etl-enrich-specimen' - ) - + etl_enrich_specimens() diff --git a/dags/etl_enrich_variants.py b/dags/etl_enrich_variants.py index dc44f2c..e8469af 100644 --- a/dags/etl_enrich_variants.py +++ b/dags/etl_enrich_variants.py @@ -9,24 +9,25 @@ .args('--config', default_config_file, '--steps', 'default' ) -with DAG( - dag_id='etl-enrich-variants', - start_date=datetime(2022, 1, 1), - schedule_interval=None, - params={ - 'project': Param('cqdg', type='string'), - }, -) as dag: - variant_task_enrich_variants = etl_variant_enrich_config.prepend_args('variants').operator( +def variant_task_enrich_variants(): + return etl_variant_enrich_config.prepend_args('variants').operator( task_id='variant_task_variant_enrich_variants', name='etl-variant_task_variant_enrich_variants' ) - variant_task_enrich_consequences = etl_variant_enrich_config.prepend_args('consequences').operator( +def variant_task_enrich_consequences(): + return etl_variant_enrich_config.prepend_args('consequences').operator( task_id='variant_task_variant_enrich_consequences', name='etl-variant_task_variant_enrich_consequences' ) - - variant_task_enrich_variants >> variant_task_enrich_consequences +with DAG( + dag_id='etl-enrich-variants', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'project': Param('cqdg', type='string'), + }, +) as dag: + variant_task_enrich_variants() >> variant_task_enrich_consequences() diff --git a/dags/etl_index_variants.py b/dags/etl_index_variants.py index 9b74b46..efe50c7 100644 --- a/dags/etl_index_variants.py +++ b/dags/etl_index_variants.py @@ -2,46 +2,53 @@ from airflow.models.param import Param from airflow.utils.task_group import TaskGroup from datetime import datetime -from lib.config import es_port, es_url, release_id, default_config_file, etl_index_config, datalake_bucket +from lib.config import es_port, es_url, release_id, default_config_file, etl_index_config, datalake_bucket, env, Env etl_index_variant_config = etl_index_config \ .with_spark_class('bio.ferlab.fhir.etl.VariantIndexTask') -with DAG( - dag_id='etl-index-variants', - start_date=datetime(2022, 1, 1), - schedule_interval=None, - params={ - 'release_id': Param('7', type='string'), - 'project': Param('cqdg', type='string') - }, -) as dag: +def operator(indexName:str, chromosome:str='all'): + indexNameU = indexName.replace('_', '-') + return etl_index_variant_config \ + .args(es_port, release_id, indexName, default_config_file, f's3a://{datalake_bucket}/es_index/{indexName}/', chromosome, es_url) \ + .operator( + task_id=f'{indexName}_index', + name=f'etl-index-{indexNameU}-index' + ) - chromosomes = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', - '19', '20', '21', '22', 'X', 'Y', 'M'] +def genes(): + return operator('gene_centric') >> operator('gene_suggestions') - def operator(indexName:str, chromosome:str='all'): - indexNameU = indexName.replace('_', '-') - return etl_index_variant_config \ - .args(es_port, release_id, indexName, default_config_file, f's3a://{datalake_bucket}/es_index/{indexName}/', chromosome, es_url) \ - .operator( - task_id=f'{indexName}_index', - name=f'etl-index-{indexNameU}-index' - ) - def genes(): - return operator('gene_centric') >> operator('gene_suggestions') +def variants(chr): + with TaskGroup(group_id=f'variant_index-{chr}') as variant_index: + operator('variant_centric', chr) >> operator('variant_suggestions', chr) + return variant_index - def variants(chr): - with TaskGroup(group_id=f'variant_index-{chr}') as variant_index: - operator('variant_centric', chr) >> operator('variant_suggestions', chr) - return variant_index - +def index_variants(): + if env == Env.QA: + chromosomes = ['1'] + else: + chromosomes = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', + '19', '20', '21', '22', 'X', 'Y', 'M'] task_arr = [genes()] - for chr in chromosomes: - task = variants(chr) + for chromosome in chromosomes: + task = variants(chromosome) task_arr[-1] >> task task_arr.append(task) + return task_arr + + +with DAG( + dag_id='etl-index-variants', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'release_id': Param('7', type='string'), + 'project': Param('cqdg', type='string') + }, +) as dag: + index_variants() \ No newline at end of file diff --git a/dags/etl_prepare_index_variants.py b/dags/etl_prepare_index_variants.py index aaa3701..60f53f8 100644 --- a/dags/etl_prepare_index_variants.py +++ b/dags/etl_prepare_index_variants.py @@ -9,6 +9,13 @@ .args('--config', default_config_file, '--steps', 'default' ) + +def etl_variant_prepared(name): + return etl_variant_prepared_config.prepend_args(name).operator( + task_id=f'variant_task_{name}', + name=f'etl-variant_task_{name}' + ) + with DAG( dag_id='etl-prepare-index-variant', start_date=datetime(2022, 1, 1), @@ -18,24 +25,4 @@ }, ) as dag: - variant_task_variant_centric = etl_variant_prepared_config.prepend_args('variant_centric').operator( - task_id='variant_task_variant_centric', - name='etl-variant_task_variant_centric' - ) - - variant_task_gene_centric = etl_variant_prepared_config.prepend_args('gene_centric').operator( - task_id='variant_task_gene_centric', - name='etl-variant_task_gene_centric' - ) - - variant_task_variant_suggestions = etl_variant_prepared_config.prepend_args('variant_suggestions').operator( - task_id='variant_task_variant_suggestions', - name='etl-variant_variant_suggestions' - ) - - variant_task_gene_suggestions = etl_variant_prepared_config.prepend_args('gene_suggestions').operator( - task_id='variant_task_gene_suggestions', - name='etl-variant_gene_suggestions' - ) - - variant_task_variant_centric >> variant_task_gene_centric >> variant_task_variant_suggestions >> variant_task_gene_suggestions + etl_variant_prepared('variant_centric') >> etl_variant_prepared('gene_centric') >> etl_variant_prepared('variant_suggestions') >> etl_variant_prepared('gene_suggestions') diff --git a/dags/etl_publish_variants.py b/dags/etl_publish_variants.py new file mode 100644 index 0000000..5e48714 --- /dev/null +++ b/dags/etl_publish_variants.py @@ -0,0 +1,40 @@ +from airflow import DAG +from airflow.models import Param, Variable +from datetime import datetime +from lib.config import kube_config, es_url, es_port, es_credentials_secret_name, \ + es_credentials_secret_key_password, es_credentials_secret_key_username, release_id, study_codes +from lib.operators.publish import PublishConfig + +etl_publish_config = PublishConfig( + es_url = es_url, + kube_config = kube_config, + image = Variable.get('publish_image'), + es_port = es_port, + es_cert_secret_name = 'opensearch-ca-certificate', + es_credentials_secret_name = es_credentials_secret_name, + es_credentials_secret_key_username = es_credentials_secret_key_username, + es_credentials_secret_key_password = es_credentials_secret_key_password, +) + +def publish_task(job_types: str): + return etl_publish_config.args( + '-n', 'https://search-workers.qa.juno.cqdg.ferlab.bio', + '-p', es_port, + '-r', release_id, + '-j', job_types) \ + .operator( + task_id='etl_publish_variant', + name='etl-publish_variant', + ) +with DAG( + dag_id='etl-publish-variant', + start_date=datetime(2022, 1, 1), + schedule_interval=None, + params={ + 'es_port': Param('9200', type='string'), + 'release_id': Param('0', type='string'), + 'job_types': Param('variant_centric,variant_suggestions,gene_centric,gene_suggestions', type='string'), + }, +) as dag: + publish_task('{{ params.job_types }}') + From 44eba014c8ee58ba0e77a9bbfea1449ee0c6eff2 Mon Sep 17 00:00:00 2001 From: Adrian Date: Fri, 23 Aug 2024 09:21:50 -0400 Subject: [PATCH 2/3] feat: CQDG-719 refactor varaints normalize --- dags/etl-variant.py | 38 ++++++++---- dags/etl_enrich_specimens.py | 9 ++- dags/etl_normalize_variants.py | 103 +++++++++++++++++++++++++-------- dags/etl_publish.py | 12 ++-- 4 files changed, 119 insertions(+), 43 deletions(-) diff --git a/dags/etl-variant.py b/dags/etl-variant.py index fef70f7..437ce40 100644 --- a/dags/etl-variant.py +++ b/dags/etl-variant.py @@ -5,32 +5,50 @@ from airflow.utils.task_group import TaskGroup from es_templates_update import es_templates_update -from etl_enrich_specimens import etl_enrich_specimens +from etl_enrich_specimens import etl_enrich_specimens from etl_enrich_variants import variant_task_enrich_variants, variant_task_enrich_consequences -from etl_index import index_operator from etl_index_variants import index_variants -from etl_normalize_variants import normalize_variant_operator +from etl_normalize_variants import extract_params, normalized_etl from etl_prepare_index_variants import etl_variant_prepared -from etl_publish import publish_task +from etl_publish_variants import publish_task with DAG( dag_id='etl-variant', start_date=datetime(2022, 1, 1), schedule_interval=None, + # concurrency set to 1, only one task can run at a time to avoid conflicts in Delta table + concurrency=1, params={ - 'study_code': Param('CAG', type='string'), #FIXME study Codes vs study code !!! + 'study_code': Param('CAG', type='string'), 'owner': Param('jmichaud', type='string'), - 'dataset': Param('dataset_default', type='string'), - 'batch': Param('annotated_vcf', type='string'), + 'dateset_batches': Param( + [ + {'dataset': 'dataset_dataset1', 'batches': ['annotated_vcf1','annotated_vcf2']}, + {'dataset': 'dataset_dataset2', 'batches': ['annotated_vcf']} + ], + schema = { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "default": {'dataset': 'dataset_default', 'batches': ['annotated_vcf']}, + "properties": { + "dataset": {"type": "string"}, + "batches": {"type": "array", "items": {"type": "string"}}, + }, + "required": ["dataset", "batches"] + }, + } + ), 'release_id': Param('7', type='string'), - 'study_codes': Param('CAG', type='string'), 'project': Param('cqdg', type='string'), - 'es_port': Param('9200', type='string') + 'es_port': Param('9200', type='string'), }, ) as dag: + params = extract_params() with TaskGroup(group_id='normalize') as normalize: - normalize_variant_operator('snv') >> normalize_variant_operator('consequences') + normalized_etl(run_time_params = params, name='snv') >> normalized_etl(run_time_params = params, name='consequences') with TaskGroup(group_id='enrich') as enrich: variant_task_enrich_variants() >> variant_task_enrich_consequences() diff --git a/dags/etl_enrich_specimens.py b/dags/etl_enrich_specimens.py index 7ccdff6..928d03a 100644 --- a/dags/etl_enrich_specimens.py +++ b/dags/etl_enrich_specimens.py @@ -1,8 +1,11 @@ +from datetime import datetime + from airflow import DAG from airflow.models.param import Param -from datetime import datetime -from lib.config import default_config_file, study_code, study_codes + from etl_prepare_index import etl_base_config, spark_small_conf, prepare_index_jar +from lib.config import default_config_file, study_code + def etl_enrich_specimens(): return etl_base_config \ @@ -13,7 +16,7 @@ def etl_enrich_specimens(): '--config', default_config_file, '--steps', 'default', '--app-name', 'enrich_specimen', - '--study-id', study_code or study_codes + '--study-id', study_code, ).operator( task_id='enrich-specimen', name='etl-enrich-specimen' diff --git a/dags/etl_normalize_variants.py b/dags/etl_normalize_variants.py index 3cde37b..9105ddd 100644 --- a/dags/etl_normalize_variants.py +++ b/dags/etl_normalize_variants.py @@ -1,48 +1,103 @@ +from __future__ import annotations + from datetime import datetime from airflow import DAG +from airflow.decorators import task from airflow.models import Param +from airflow.operators.python import get_current_context + +from lib.config import default_config_file, study_code, etl_variant_config +from lib.operators.spark import SparkOperator + +class NormalizeVariants(SparkOperator): + template_fields = [*SparkOperator.template_fields, 'arguments', 'dataset_batch'] + + def __init__(self, + dataset_batch, + **kwargs): + super().__init__(**kwargs) + self.dataset_batch = dataset_batch -from lib.config import batch, default_config_file, study_code, spark_large_conf, \ - etl_variant_config, spark_small_conf + def execute(self, **kwargs): + # Append dataset and batch to arguments at runtime. + self.arguments.append('--dataset') + self.arguments.append(self.dataset_batch[0]) + self.arguments.append('--batch') + self.arguments.append(self.dataset_batch[1]) + super().execute(**kwargs) -normalized_etl = etl_variant_config \ - .with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic') \ - .args( +@task +def extract_params() -> list[(str, list[str])]: + """Extract input arguments at runtime. + Returns: List of datasets with their batches + """ + context = get_current_context() + items = context["params"]["dateset_batches"] + r_list = [] + + for item in items: + bs = item['batches'] + d = item['dataset'] + for b in bs: + r_list.append((d, b)) + return r_list + +def normalized_etl(run_time_params, name): + return (etl_variant_config + .with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic') + .prepend_args(name) + .args( '--config', default_config_file, '--steps', 'default', '--app-name', 'variant_task_consequences', '--owner', '{{ params.owner }}', - '--dataset', '{{ params.dataset }}', - '--batch', batch, - '--study-code', study_code - ) \ - .add_package('io.projectglow:glow-spark3_2.12:2.0.0') \ - .add_spark_conf({'spark.jars.excludes': 'org.apache.hadoop:hadoop-client,' - 'io.netty:netty-all,' - 'io.netty:netty-handler,' - 'io.netty:netty-transport-native-epoll', - 'spark.hadoop.io.compression.codecs': 'io.projectglow.sql.util.BGZFCodec', - }) \ - - -def normalize_variant_operator(name): - etl = normalized_etl if name == 'snv' else normalized_etl - return etl.prepend_args(name).operator( + '--study-code', study_code) + .add_package('io.projectglow:glow-spark3_2.12:2.0.0') + .add_spark_conf({'spark.jars.excludes': 'org.apache.hadoop:hadoop-client,' + 'io.netty:netty-all,' + 'io.netty:netty-handler,' + 'io.netty:netty-transport-native-epoll', + 'spark.hadoop.io.compression.codecs': 'io.projectglow.sql.util.BGZFCodec', + }) + .partial( + class_to_instantiate=NormalizeVariants, task_id=f'normalize-{name}', name=f'normalize-{name}') + .expand(dataset_batch=run_time_params)) with DAG( dag_id='etl-normalize-variants', start_date=datetime(2022, 1, 1), schedule_interval=None, + # concurrency set to 1, only one task can run at a time to avoid conflicts in Delta table + concurrency=1, params={ 'study_code': Param('CAG', type='string'), 'owner': Param('jmichaud', type='string'), - 'dataset': Param('dataset_default', type='string'), - 'batch': Param('annotated_vcf', type='string'), 'project': Param('cqdg', type='string'), + 'dateset_batches': Param( + [ + {'dataset': 'dataset_dataset1', 'batches': ['annotated_vcf1','annotated_vcf2']}, + {'dataset': 'dataset_dataset2', 'batches': ['annotated_vcf']} + ], + schema = { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "default": {'dataset': 'dataset_default', 'batches': ['annotated_vcf']}, + "properties": { + "dataset": {"type": "string"}, + "batches": {"type": "array", "items": {"type": "string"}}, + }, + "required": ["dataset", "batches"] + }, + } + ), }, ) as dag: - normalize_variant_operator('snv') >> normalize_variant_operator('consequences') + params = extract_params() + + normalized_etl(run_time_params = params, name='snv') >> normalized_etl(run_time_params = params, name='consequences') \ No newline at end of file diff --git a/dags/etl_publish.py b/dags/etl_publish.py index 24aa167..62b5382 100644 --- a/dags/etl_publish.py +++ b/dags/etl_publish.py @@ -18,12 +18,12 @@ def publish_task(job_types: str): return etl_publish_config.args( - es_url, - es_port, - release_id, - study_codes, - job_types) \ - .operator( + '-n', 'https://search-workers.qa.juno.cqdg.ferlab.bio', + '-p', es_port, + '-r', release_id, + '-j', job_types, + '-s', study_codes) \ + .operator( task_id='etl_publish', name='etl-publish', ) From 5b847892c35057d6e7a0327653ebf2a98fa5ee42 Mon Sep 17 00:00:00 2001 From: Adrian Date: Wed, 28 Aug 2024 09:01:12 -0400 Subject: [PATCH 3/3] fix: CQDG-719 replace hard coded ES URL --- dags/etl_publish.py | 2 +- dags/etl_publish_variants.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/etl_publish.py b/dags/etl_publish.py index 62b5382..dd463b1 100644 --- a/dags/etl_publish.py +++ b/dags/etl_publish.py @@ -18,7 +18,7 @@ def publish_task(job_types: str): return etl_publish_config.args( - '-n', 'https://search-workers.qa.juno.cqdg.ferlab.bio', + '-n', es_url, '-p', es_port, '-r', release_id, '-j', job_types, diff --git a/dags/etl_publish_variants.py b/dags/etl_publish_variants.py index 5e48714..6819a48 100644 --- a/dags/etl_publish_variants.py +++ b/dags/etl_publish_variants.py @@ -18,7 +18,7 @@ def publish_task(job_types: str): return etl_publish_config.args( - '-n', 'https://search-workers.qa.juno.cqdg.ferlab.bio', + '-n', es_url, '-p', es_port, '-r', release_id, '-j', job_types) \