Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jecos committed Mar 8, 2024
1 parent abdd27e commit df49ff8
Showing 1 changed file with 18 additions and 42 deletions.
60 changes: 18 additions & 42 deletions dags/etl_normalize_variants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,23 @@
from airflow.models.param import Param
from datetime import datetime
from lib.config import release_id, batch, default_config_file, study_id, etl_variant_config

etl_variant_normalized_config = etl_variant_config \
.with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic')
normalized_etl=etl_variant_config \
.with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic').args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'variant_task_consequences',
'--release-id', release_id,
'--owner', '{{ params.owner }}',
'--dataset', '{{ params.dataset }}',
'--batch', batch,
'--study-id', study_id,
'--study-code', '{{ params.study_code }}'
)
def normalize_variant_operator(name):
return normalized_etl.prepend_args(name).operator(
task_id=f'normalize-{name}',
name=f'normalize-{name}',
)

with DAG(
dag_id='etl-normalize-variants',
Expand All @@ -20,42 +34,4 @@
'project': Param('cqdg', type='string'),
},
) as dag:


def study_code() -> str:
return '{{ params.study_code }}'

def project() -> str:
return '{{ params.project }}'

def dataset() -> str:
return '{{ params.dataset }}'

def batch() -> str:
return '{{ params.batch }}'

def owner() -> str:
return '{{ params.owner }}'

normalized_etl=etl_variant_normalized_config.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'variant_task_consequences',
'--release-id', release_id,
'--owner', '{{ params.owner }}',
'--dataset', '{{ params.dataset }}',
'--batch', batch,
'--study-id', study_id,
'--study-code', '{{ params.study_code }}'
)

variant_task_variants = normalized_etl.prepend_args('snv').operator(
task_id='variant-task_snv',
name='etl-variant-task_snv',
)
variant_task_consequences = normalized_etl.prepend_args('consequences').operator(
task_id='variant_task_consequences',
name='etl-variant_task_consequences',
)

variant_task_variants >> variant_task_consequences
normalize_variant_operator('snv') >> normalize_variant_operator('consequences')

0 comments on commit df49ff8

Please sign in to comment.