Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CDDG-719 etl variant and variant publish #32

Merged
merged 3 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions dags/etl-variant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from airflow.utils.task_group import TaskGroup

from es_templates_update import es_templates_update
from etl_enrich_specimens import etl_enrich_specimens
from etl_enrich_variants import variant_task_enrich_variants, variant_task_enrich_consequences
from etl_index_variants import index_variants
from etl_normalize_variants import extract_params, normalized_etl
from etl_prepare_index_variants import etl_variant_prepared
from etl_publish_variants import publish_task

with DAG(
dag_id='etl-variant',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
# concurrency set to 1, only one task can run at a time to avoid conflicts in Delta table
concurrency=1,
params={
'study_code': Param('CAG', type='string'),
'owner': Param('jmichaud', type='string'),
'dateset_batches': Param(
[
{'dataset': 'dataset_dataset1', 'batches': ['annotated_vcf1','annotated_vcf2']},
{'dataset': 'dataset_dataset2', 'batches': ['annotated_vcf']}
],
schema = {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"default": {'dataset': 'dataset_default', 'batches': ['annotated_vcf']},
"properties": {
"dataset": {"type": "string"},
"batches": {"type": "array", "items": {"type": "string"}},
},
"required": ["dataset", "batches"]
},
}
),
'release_id': Param('7', type='string'),
'project': Param('cqdg', type='string'),
'es_port': Param('9200', type='string'),
},
) as dag:
params = extract_params()

with TaskGroup(group_id='normalize') as normalize:
normalized_etl(run_time_params = params, name='snv') >> normalized_etl(run_time_params = params, name='consequences')

with TaskGroup(group_id='enrich') as enrich:
variant_task_enrich_variants() >> variant_task_enrich_consequences()

with TaskGroup(group_id='prepared') as prepared:
etl_variant_prepared('variant_centric') >> etl_variant_prepared('gene_centric') >> etl_variant_prepared('variant_suggestions') >> etl_variant_prepared('gene_suggestions')

with TaskGroup(group_id='index') as index:
index_variants()

etl_enrich_specimens() >> normalize >> enrich >> prepared >> es_templates_update() >> index >> publish_task('variant_centric,variant_suggestions,gene_centric,gene_suggestions')
38 changes: 21 additions & 17 deletions dags/etl_enrich_specimens.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,26 @@
from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from datetime import datetime
from lib.config import default_config_file, study_codes

from etl_prepare_index import etl_base_config, spark_small_conf, prepare_index_jar
from lib.config import default_config_file, study_code


def etl_enrich_specimens():
return etl_base_config \
.add_spark_conf(spark_small_conf) \
.with_spark_jar(prepare_index_jar) \
.with_spark_class('bio.ferlab.fhir.etl.Enrich') \
.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'enrich_specimen',
'--study-id', study_code,
).operator(
task_id='enrich-specimen',
name='etl-enrich-specimen'
)

with DAG(
dag_id='etl-enrich-specimen',
Expand All @@ -13,19 +31,5 @@
'project': Param('cqdg', type='string'),
},
) as dag:

etl_base_config \
.add_spark_conf(spark_small_conf) \
.with_spark_jar(prepare_index_jar) \
.with_spark_class('bio.ferlab.fhir.etl.Enrich') \
.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'enrich_specimen',
'--study-id', study_codes
).operator(
task_id='enrich-specimen',
name='etl-enrich-specimen'
)

etl_enrich_specimens()

25 changes: 13 additions & 12 deletions dags/etl_enrich_variants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,25 @@
.args('--config', default_config_file,
'--steps', 'default'
)
with DAG(
dag_id='etl-enrich-variants',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'project': Param('cqdg', type='string'),
},
) as dag:

variant_task_enrich_variants = etl_variant_enrich_config.prepend_args('variants').operator(
def variant_task_enrich_variants():
return etl_variant_enrich_config.prepend_args('variants').operator(
task_id='variant_task_variant_enrich_variants',
name='etl-variant_task_variant_enrich_variants'
)

variant_task_enrich_consequences = etl_variant_enrich_config.prepend_args('consequences').operator(
def variant_task_enrich_consequences():
return etl_variant_enrich_config.prepend_args('consequences').operator(
task_id='variant_task_variant_enrich_consequences',
name='etl-variant_task_variant_enrich_consequences'
)


variant_task_enrich_variants >> variant_task_enrich_consequences
with DAG(
dag_id='etl-enrich-variants',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'project': Param('cqdg', type='string'),
},
) as dag:
variant_task_enrich_variants() >> variant_task_enrich_consequences()
65 changes: 36 additions & 29 deletions dags/etl_index_variants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,53 @@
from airflow.models.param import Param
from airflow.utils.task_group import TaskGroup
from datetime import datetime
from lib.config import es_port, es_url, release_id, default_config_file, etl_index_config, datalake_bucket
from lib.config import es_port, es_url, release_id, default_config_file, etl_index_config, datalake_bucket, env, Env

etl_index_variant_config = etl_index_config \
.with_spark_class('bio.ferlab.fhir.etl.VariantIndexTask')

with DAG(
dag_id='etl-index-variants',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'release_id': Param('7', type='string'),
'project': Param('cqdg', type='string')
},
) as dag:
def operator(indexName:str, chromosome:str='all'):
indexNameU = indexName.replace('_', '-')
return etl_index_variant_config \
.args(es_port, release_id, indexName, default_config_file, f's3a://{datalake_bucket}/es_index/{indexName}/', chromosome, es_url) \
.operator(
task_id=f'{indexName}_index',
name=f'etl-index-{indexNameU}-index'
)

chromosomes = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18',
'19', '20', '21', '22', 'X', 'Y', 'M']
def genes():
return operator('gene_centric') >> operator('gene_suggestions')

def operator(indexName:str, chromosome:str='all'):
indexNameU = indexName.replace('_', '-')
return etl_index_variant_config \
.args(es_port, release_id, indexName, default_config_file, f's3a://{datalake_bucket}/es_index/{indexName}/', chromosome, es_url) \
.operator(
task_id=f'{indexName}_index',
name=f'etl-index-{indexNameU}-index'
)

def genes():
return operator('gene_centric') >> operator('gene_suggestions')
def variants(chr):
with TaskGroup(group_id=f'variant_index-{chr}') as variant_index:
operator('variant_centric', chr) >> operator('variant_suggestions', chr)
return variant_index


def variants(chr):
with TaskGroup(group_id=f'variant_index-{chr}') as variant_index:
operator('variant_centric', chr) >> operator('variant_suggestions', chr)
return variant_index

def index_variants():
if env == Env.QA:
chromosomes = ['1']
else:
chromosomes = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18',
'19', '20', '21', '22', 'X', 'Y', 'M']

task_arr = [genes()]

for chr in chromosomes:
task = variants(chr)
for chromosome in chromosomes:
task = variants(chromosome)
task_arr[-1] >> task
task_arr.append(task)
return task_arr


with DAG(
dag_id='etl-index-variants',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'release_id': Param('7', type='string'),
'project': Param('cqdg', type='string')
},
) as dag:
index_variants()
103 changes: 79 additions & 24 deletions dags/etl_normalize_variants.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,103 @@
from __future__ import annotations

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.models import Param
from airflow.operators.python import get_current_context

from lib.config import default_config_file, study_code, etl_variant_config
from lib.operators.spark import SparkOperator

class NormalizeVariants(SparkOperator):
template_fields = [*SparkOperator.template_fields, 'arguments', 'dataset_batch']

def __init__(self,
dataset_batch,
**kwargs):
super().__init__(**kwargs)
self.dataset_batch = dataset_batch

from lib.config import batch, default_config_file, study_code, spark_large_conf, \
etl_variant_config, spark_small_conf
def execute(self, **kwargs):
# Append dataset and batch to arguments at runtime.
self.arguments.append('--dataset')
self.arguments.append(self.dataset_batch[0])
self.arguments.append('--batch')
self.arguments.append(self.dataset_batch[1])
super().execute(**kwargs)

normalized_etl = etl_variant_config \
.with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic') \
.args(
@task
def extract_params() -> list[(str, list[str])]:
"""Extract input arguments at runtime.
Returns: List of datasets with their batches
"""
context = get_current_context()
items = context["params"]["dateset_batches"]
r_list = []

for item in items:
bs = item['batches']
d = item['dataset']
for b in bs:
r_list.append((d, b))
return r_list

def normalized_etl(run_time_params, name):
return (etl_variant_config
.with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic')
.prepend_args(name)
.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'variant_task_consequences',
'--owner', '{{ params.owner }}',
'--dataset', '{{ params.dataset }}',
'--batch', batch,
'--study-code', study_code
) \
.add_package('io.projectglow:glow-spark3_2.12:2.0.0') \
.add_spark_conf({'spark.jars.excludes': 'org.apache.hadoop:hadoop-client,'
'io.netty:netty-all,'
'io.netty:netty-handler,'
'io.netty:netty-transport-native-epoll',
'spark.hadoop.io.compression.codecs': 'io.projectglow.sql.util.BGZFCodec',
}) \


def normalize_variant_operator(name):
etl = normalized_etl if name == 'snv' else normalized_etl
return etl.prepend_args(name).operator(
'--study-code', study_code)
.add_package('io.projectglow:glow-spark3_2.12:2.0.0')
.add_spark_conf({'spark.jars.excludes': 'org.apache.hadoop:hadoop-client,'
'io.netty:netty-all,'
'io.netty:netty-handler,'
'io.netty:netty-transport-native-epoll',
'spark.hadoop.io.compression.codecs': 'io.projectglow.sql.util.BGZFCodec',
})
.partial(
class_to_instantiate=NormalizeVariants,
task_id=f'normalize-{name}',
name=f'normalize-{name}')
.expand(dataset_batch=run_time_params))


with DAG(
dag_id='etl-normalize-variants',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
# concurrency set to 1, only one task can run at a time to avoid conflicts in Delta table
concurrency=1,
params={
'study_code': Param('CAG', type='string'),
'owner': Param('jmichaud', type='string'),
'dataset': Param('dataset_default', type='string'),
'batch': Param('annotated_vcf', type='string'),
'project': Param('cqdg', type='string'),
'dateset_batches': Param(
[
{'dataset': 'dataset_dataset1', 'batches': ['annotated_vcf1','annotated_vcf2']},
{'dataset': 'dataset_dataset2', 'batches': ['annotated_vcf']}
],
schema = {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"default": {'dataset': 'dataset_default', 'batches': ['annotated_vcf']},
"properties": {
"dataset": {"type": "string"},
"batches": {"type": "array", "items": {"type": "string"}},
},
"required": ["dataset", "batches"]
},
}
),
},
) as dag:
normalize_variant_operator('snv') >> normalize_variant_operator('consequences')
params = extract_params()

normalized_etl(run_time_params = params, name='snv') >> normalized_etl(run_time_params = params, name='consequences')
Loading
Loading