diff --git a/dags/etl.py b/dags/etl.py index 9d97d5b..71155a7 100644 --- a/dags/etl.py +++ b/dags/etl.py @@ -47,4 +47,4 @@ def publish_operator(name:str): ) publish_operator('study') >> publish_operator('participant') >> publish_operator('file') >> publish_operator('biospecimen') - fhavro_export() >> etl_import >> prepare_index >> es_templates_update >> index >> publish >> arranger_task + fhavro_export() >> etl_import() >> prepare_index >> es_templates_update >> index >> publish >> arranger_task diff --git a/dags/etl_import.py b/dags/etl_import.py index 2f504c4..b7fa9b7 100644 --- a/dags/etl_import.py +++ b/dags/etl_import.py @@ -4,15 +4,16 @@ from lib.config import etl_base_config, spark_small_conf, import_jar, default_config_file, release_id, study_ids from lib.operators.spark import SparkOperator -etl_import = etl_base_config \ - .with_spark_class('bio.ferlab.fhir.etl.ImportTask') \ - .with_spark_jar(import_jar) \ - .add_spark_conf(spark_small_conf) \ - .args(default_config_file, 'default', release_id, study_ids) \ - .operator( - task_id='import_task', - name='etl-import-task' - ) +def etl_import(): + return etl_base_config \ + .with_spark_class('bio.ferlab.fhir.etl.ImportTask') \ + .with_spark_jar(import_jar) \ + .add_spark_conf(spark_small_conf) \ + .args(default_config_file, 'default', release_id, study_ids) \ + .operator( + task_id='import_task', + name='etl-import-task' + ) with DAG( dag_id='import', @@ -24,4 +25,4 @@ 'project': Param('cqdg', type='string'), }, ) as dag: - etl_import + etl_import()