Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jecos committed Mar 8, 2024
1 parent 359ac8b commit 5d53db5
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 45 deletions.
5 changes: 3 additions & 2 deletions dags/arranger.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
keycloak_client_secret_name = keycloak_client_secret_name,
)

arranger_task = arranger_config.args(
def arranger_task():
return arranger_config.args(
'--experimental-modules=node',
'--es-module-specifier-resolution=node',
'admin/run.mjs') \
Expand All @@ -29,5 +30,5 @@
schedule_interval=None,
params={},
) as dag:
arranger_task
arranger_task()

59 changes: 30 additions & 29 deletions dags/es_templates_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,35 +43,36 @@
mc cp ./templates/template_gene_suggestions.json myminio/{datalake_bucket}/templates/template_gene_suggestions.json
"""

es_templates_update = KubernetesPodOperator(
task_id='es_templates_update',
name='es-templates-update',
image="alpine:3.14",
is_delete_operator_pod=True,
cmds=["sh", "-cx"],
arguments=[script],
namespace=kube_config.namespace,
env_vars=[
k8s.V1EnvVar(
name='AWS_ENDPOINT',
value=aws_endpoint,
),
k8s.V1EnvVar(
name='AWS_ACCESS_KEY_ID',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name=aws_secret_name,
key=aws_secret_access_key,
),
def es_templates_update():
return KubernetesPodOperator(
task_id='es_templates_update',
name='es-templates-update',
image="alpine:3.14",
is_delete_operator_pod=True,
cmds=["sh", "-cx"],
arguments=[script],
namespace=kube_config.namespace,
env_vars=[
k8s.V1EnvVar(
name='AWS_ENDPOINT',
value=aws_endpoint,
),
),
k8s.V1EnvVar(
name='AWS_SECRET_ACCESS_KEY',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name=aws_secret_name,
key=aws_secret_secret_key,
k8s.V1EnvVar(
name='AWS_ACCESS_KEY_ID',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name=aws_secret_name,
key=aws_secret_access_key,
),
),
),
), ]
)
k8s.V1EnvVar(
name='AWS_SECRET_ACCESS_KEY',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name=aws_secret_name,
key=aws_secret_secret_key,
),
),
), ]
)
6 changes: 4 additions & 2 deletions dags/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
from airflow.models.param import Param
from airflow.utils.task_group import TaskGroup
from datetime import datetime
from lib.config import env, study_ids, release_id, es_port, es_url, etl_prepare_config, project, etl_publish_config
from lib.config import env, study_ids, release_id, es_port, es_url, etl_prepare_config, project
from es_templates_update import es_templates_update
from etl_import import etl_import
from arranger import arranger_task
from fhavro_export import fhavro_export
from index import index_operator
from prepare_index import prepare_index

with DAG(
dag_id='etl',
Expand Down Expand Up @@ -40,4 +41,5 @@ def publish_operator(name:str):
)

publish_operator('study') >> publish_operator('participant') >> publish_operator('file') >> publish_operator('biospecimen')
fhavro_export() >> etl_import() >> prepare_index >> es_templates_update >> index >> publish >> arranger_task

fhavro_export() >> etl_import() >> prepare_index() >> es_templates_update() >> index >> publish >> arranger_task()
6 changes: 0 additions & 6 deletions dags/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,6 @@ class Env:
.with_spark_jar(publish_jar) \
.with_spark_class('bio.ferlab.fhir.etl.PublishTask')

etl_prepare_config = etl_base_config \
.add_spark_conf(spark_small_conf) \
.with_spark_jar(prepare_index_jar) \
.with_spark_class('bio.ferlab.fhir.etl.PrepareIndex') \
.args(default_config_file, 'default', 'all', release_id, study_ids)

etl_variant_config = etl_base_config \
.add_spark_conf(spark_large_conf) \
.with_spark_jar(variant_jar)
Expand Down
20 changes: 14 additions & 6 deletions dags/prepare_index.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
from airflow import DAG
from airflow.models.param import Param
from datetime import datetime
from lib.config import default_config_file, etl_prepare_config, release_id, study_ids
from lib.config import default_config_file, etl_base_config, spark_small_conf, prepare_index_jar, release_id, study_ids
from lib.operators.spark import SparkOperator

etl_prepare_config = etl_base_config \
.add_spark_conf(spark_small_conf) \
.with_spark_jar(prepare_index_jar) \
.with_spark_class('bio.ferlab.fhir.etl.PrepareIndex') \
.args(default_config_file, 'default', 'all', release_id, study_ids)

def prepare_index():
return etl_prepare_config \
.operator(
task_id='prepare_index',
name='etl-prepare-index'
)
with DAG(
dag_id='prepare_index',
start_date=datetime(2022, 1, 1),
Expand All @@ -14,9 +26,5 @@
'project': Param('cqdg', type='string'),
},
) as dag:
prepare_index()

prepare_index = etl_prepare_config \
.operator(
task_id='prepare_index',
name='etl-prepare-index'
)

0 comments on commit 5d53db5

Please sign in to comment.