Skip to content

Commit

Permalink
fix: CQDG-642 Normalize variant
Browse files Browse the repository at this point in the history
  • Loading branch information
jecos committed Mar 13, 2024
1 parent 5490494 commit 93ca2da
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 136 deletions.
4 changes: 2 additions & 2 deletions dags/create_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from airflow.exceptions import ParamValidationError
from airflow.models import Param, TaskInstance, DagRun

from lib.config import spark_small_conf, default_config_file, default_params, variant_jar, etl_base_config
from lib.config import spark_small_conf, default_config_file, default_params, variant_jar, etl_deps_config
from lib.operators.spark import SparkOperator

# Update default params
Expand Down Expand Up @@ -56,7 +56,7 @@ def execute(self, **kwargs):
super().execute(**kwargs)


etl_base_config.add_spark_conf(spark_small_conf) \
etl_deps_config.add_spark_conf(spark_small_conf) \
.with_spark_class('bio.ferlab.datalake.spark3.hive.CreateTableAndView') \
.with_spark_jar(variant_jar) \
.args('--config', default_config_file,
Expand Down
40 changes: 20 additions & 20 deletions dags/etl_enrich_snv.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from datetime import datetime
from lib.config import default_config_file, etl_base_config, spark_medium_conf, variant_jar, study_id, dataset, batch
from lib.operators.spark import SparkOperator

from lib.config import default_config_file, etl_deps_config, spark_medium_conf, variant_jar, study_id, dataset, batch

with DAG(
dag_id='etl-enrich-snv',
Expand All @@ -15,22 +16,21 @@
'project': Param('cqdg', type='string'),
},
) as dag:

variant_task_enrich_snv = etl_base_config \
.with_spark_jar(variant_jar) \
.with_spark_class('bio.ferlab.etl.enriched.RunEnrichGenomic') \
.add_spark_conf(spark_medium_conf) \
.args('snv') \
.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'variant_task_enrich_snv',
'--study-id', study_id,
'--dataset', dataset,
'--batch', batch
).operator(
task_id='variant_task_variant_enrich_snv',
name='etl-variant_task_variant_enrich_snv'
)
variant_task_enrich_snv = etl_deps_config \
.with_spark_jar(variant_jar) \
.with_spark_class('bio.ferlab.etl.enriched.RunEnrichGenomic') \
.add_spark_conf(spark_medium_conf) \
.args('snv') \
.args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'variant_task_enrich_snv',
'--study-id', study_id,
'--dataset', dataset,
'--batch', batch
).operator(
task_id='variant_task_variant_enrich_snv',
name='etl-variant_task_variant_enrich_snv'
)

variant_task_enrich_snv
4 changes: 2 additions & 2 deletions dags/etl_enrich_specimens.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from airflow.models.param import Param
from datetime import datetime
from lib.config import default_config_file, study_ids
from etl_prepare_index import etl_base_config, spark_small_conf, prepare_index_jar
from etl_prepare_index import etl_deps_config, spark_small_conf, prepare_index_jar

with DAG(
dag_id='etl-enrich-specimen',
Expand All @@ -14,7 +14,7 @@
},
) as dag:

etl_base_config \
etl_deps_config \
.add_spark_conf(spark_small_conf) \
.with_spark_jar(prepare_index_jar) \
.with_spark_class('bio.ferlab.fhir.etl.Enrich') \
Expand Down
4 changes: 2 additions & 2 deletions dags/etl_import.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from airflow import DAG
from airflow.models.param import Param
from datetime import datetime
from lib.config import etl_base_config, spark_small_conf, import_jar, default_config_file, release_id, study_ids
from lib.config import etl_deps_config, spark_small_conf, import_jar, default_config_file, release_id, study_ids
from lib.operators.spark import SparkOperator

def etl_import():
return etl_base_config \
return etl_deps_config \
.with_spark_class('bio.ferlab.fhir.etl.ImportTask') \
.with_spark_jar(import_jar) \
.add_spark_conf(spark_small_conf) \
Expand Down
107 changes: 45 additions & 62 deletions dags/etl_normalize_variants.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,57 @@
from airflow import DAG
from airflow.models.param import Param
from datetime import datetime
from lib.config import release_id, batch, kube_config, default_config_file, study_id, datalake_bucket, aws_secret_name, aws_secret_access_key, aws_secret_secret_key, aws_endpoint, hive_metastore_uri, spark_large_conf

from airflow import DAG
from airflow.models import Param
from kubernetes.client import models as k8s
from lib.operators.spark import SparkOperatorConfig
spark_default_conf = {
'spark.sql.shuffle.partitions' : '1000',
'spark.sql.extensions' : 'io.delta.sql.DeltaSparkSessionExtension',
'spark.sql.catalog.spark_catalog' : 'org.apache.spark.sql.delta.catalog.DeltaCatalog',
'spark.hadoop.fs.s3a.impl' : 'org.apache.hadoop.fs.s3a.S3AFileSystem',
'spark.hadoop.fs.s3a.fast.upload' : 'true',
'spark.hadoop.fs.s3a.connection.ssl.enabled' : 'true',
'spark.hadoop.fs.s3a.path.style.access' : 'true',
'spark.hadoop.fs.s3a.endpoint' : aws_endpoint,
'spark.hadoop.fs.s3a.aws.credentials.provider' : 'com.amazonaws.auth.EnvironmentVariableCredentialsProvider',
'spark.kubernetes.driver.secretKeyRef.AWS_ACCESS_KEY_ID' : f'{aws_secret_name}:{aws_secret_access_key}',
'spark.kubernetes.driver.secretKeyRef.AWS_SECRET_ACCESS_KEY' : f'{aws_secret_name}:{aws_secret_secret_key}',
'spark.kubernetes.executor.secretKeyRef.AWS_ACCESS_KEY_ID' : f'{aws_secret_name}:{aws_secret_access_key}',
'spark.kubernetes.executor.secretKeyRef.AWS_SECRET_ACCESS_KEY' : f'{aws_secret_name}:{aws_secret_secret_key}',
'spark.hadoop.hive.metastore.uris' : hive_metastore_uri,
'spark.sql.warehouse.dir' : f's3a://{datalake_bucket}/hive',
'spark.eventLog.enabled' : 'true',
'spark.eventLog.dir' : f's3a://{datalake_bucket}/spark-logs',
'spark.driver.extraJavaOptions' : '"-Divy.cache.dir=/tmp -Divy.home=/tmp"',
'spark.jars.ivy': '/tmp'
}

normalized_etl= SparkOperatorConfig(
spark_configs=[spark_default_conf, spark_large_conf],
image = 'apache/spark:3.4.1',
kube_config=kube_config,
is_delete_operator_pod=False
) \
.with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic').args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'variant_task_consequences',
'--owner', '{{ params.owner }}',
'--dataset', '{{ params.dataset }}',
'--batch', batch,
'--study-id', study_id,
'--study-code', '{{ params.study_code }}'
from lib.config import release_id, batch, default_config_file, study_id, datalake_bucket, aws_secret_name, \
aws_secret_access_key, aws_secret_secret_key, etl_base_config

"""
!!!WARNING Before changing this configuration, read this :
Normalize variant has a dependency on Glow 1.2.1 which is NOT compatible with Spark 3.4.
- We use our ETL image as
a client to get the jar, and pass file://path_to_jar in spark submit argument
- Spark client copy this jar in S3 (spark.kubernetes.file.upload.path configuration) to share it with driver.
So AWS S3 credentials are required by the client.
- For driver and executors we use our own Ferlab image
based on apache/spark:v3.3.2. This image just add hadoop-aws dependencies because glow requires to have these jars in
spark jars directory. Use packages configuration in spark job is NOT enough.
- variant-normalize-task.jar embed delta-core, otherwise we got ClassNotFoundException on DeltaTable,
even if delta is specified in packages configuration. We think this is an issue due to spark:v3.2.2 image, but not sure.
Glow 2.0.0 should fixed these issues, but it is not yet available on Maven central.
"""
normalized_etl = etl_base_config \
.with_spark_class('bio.ferlab.etl.normalized.RunNormalizedGenomic').args(
'--config', default_config_file,
'--steps', 'default',
'--app-name', 'variant_task_consequences',
'--owner', '{{ params.owner }}',
'--dataset', '{{ params.dataset }}',
'--batch', batch,
'--study-id', study_id,
'--study-code', '{{ params.study_code }}'
) \
.with_spark_jar('s3a://cqdg-qa-app-datalake/jars/variant-task.jar') \
.add_package('io.delta:delta-core_2.12:2.3.0') \
.add_spark_conf({
# 'spark.jars.packages': 'io.delta:delta-core_2.12:2.3.0',
# 'spark.jars.packages': 'io.delta:delta-core_2.12:2.1.1,io.projectglow:glow-spark3_2.12:1.2.1',
# 'spark.jars.excludes':'org.apache.hadoop:hadoop-client',
.with_spark_jar(
# ETL client container contain this file locally, it will be share with driver through spark.kubernetes.file.upload.path
'file:///app/variant-normalize-task.jar'
) \
.add_spark_conf({
'spark.jars.packages': 'org.apache.hadoop:hadoop-aws:3.3.2',
# required by the client to be able to push etl jar file on s3
'spark.kubernetes.container.image': 'ferlabcrsj/spark:469f2fc61f06fcfa73c1480a5e5f73f59768152d',
# 'spark.kubernetes.container.image': 'apache/spark:3.4.1',
# 'spark.kubernetes.container.image': 'apache/spark:v3.3.2',
# 'spark.kubernetes.file.upload.path': f's3a://{datalake_bucket}/dependencies'
}) #.with_spark_jar('https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/v2.21.5/variant-task.jar')

###
# - ferlabcrsj/spark:469f2fc61f06fcfa73c1480a5e5f73f59768152d (spark 3.3.2 + hadoop-aws) + https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/v2.21.5/variant-task.jar => fonctionne
# - apache/spark:3.3.2 + https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/v2.21.5/variant-task.jar + hadoop-aws dans packages => ne marche pas car probleme classpath ( java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found)
# - apache/spark:3.4.1 + https://github.com/Ferlab-Ste-Justine/etl-cqdg-portal/releases/download/v2.21.5/variant-task.jar => ne fonctionne pas car incompatibilité version delta
# - ferlabcrsj/spark:469f2fc61f06fcfa73c1480a5e5f73f59768152d (spark 3.3.2 + hadoop-aws) + new variant-task.jar delta 2.1.1 => ne fonctionne pas : java.lang.NoClassDefFoundError: io/delta/tables/DeltaTable$
# use ferlab spark 3.3.2 image because it contains hadoop-aws in spark jars directory, otherwise glow give a ClassNotFoundException
'spark.kubernetes.file.upload.path': f's3a://{datalake_bucket}/dependencies' # directory used to share etl jar
})


# s3a://cqdg-qa-app-datalake/jars/variant-task.jar
###

def normalize_variant_operator(name):
etl = normalized_etl.args('--release-id', release_id ) if name == 'snv' else normalized_etl
etl = normalized_etl.args('--release-id', release_id) if name == 'snv' else normalized_etl
return etl.prepend_args(name).operator(
task_id=f'normalize-{name}',
name=f'normalize-{name}',
env_vars=[
# Client needs spark credentials in order to push jar file in spark.kubernetes.file.upload.path
k8s.V1EnvVar(
name='AWS_ACCESS_KEY_ID',
value_from=k8s.V1EnvVarSource(
Expand All @@ -87,9 +69,10 @@ def normalize_variant_operator(name):
key=aws_secret_secret_key,
),
),
), ]
), ]
)


with DAG(
dag_id='etl-normalize-variants',
start_date=datetime(2022, 1, 1),
Expand All @@ -104,4 +87,4 @@ def normalize_variant_operator(name):
'project': Param('cqdg', type='string'),
},
) as dag:
normalize_variant_operator('snv') >> normalize_variant_operator('consequences')
normalize_variant_operator('snv') >> normalize_variant_operator('consequences')
4 changes: 2 additions & 2 deletions dags/etl_prepare_index.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from airflow import DAG
from airflow.models.param import Param
from datetime import datetime
from lib.config import default_config_file, etl_base_config, spark_small_conf, prepare_index_jar, study_ids
from lib.config import default_config_file, etl_deps_config, spark_small_conf, prepare_index_jar, study_ids

etl_prepare_config = etl_base_config \
etl_prepare_config = etl_deps_config \
.add_spark_conf(spark_small_conf) \
.with_spark_jar(prepare_index_jar) \
.with_spark_class('bio.ferlab.fhir.etl.PrepareIndex') \
Expand Down
17 changes: 7 additions & 10 deletions dags/lib/config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import kubernetes
from airflow.exceptions import AirflowConfigException
from airflow.models import Variable, Param
from lib.operators.spark import SparkOperatorConfig

from lib.operators.base_kubernetes import KubeConfig
from lib.operators.fhavro import FhavroConfig
from lib.operators.fhir_import import FhirCsvConfig
from lib.operators.spark import SparkOperatorConfig


class Env:
Expand Down Expand Up @@ -45,7 +42,6 @@ class Env:
study_id = '{{ params.study_id }}'
study_ids = '{{ params.study_ids }}'
project = '{{ params.project }}'
project = '{{ params.project }}'
dataset = '{{ params.dataset }}'
batch = '{{ params.batch }}'
release_id = '{{ params.release_id }}'
Expand All @@ -60,7 +56,6 @@ class Env:
)

spark_default_conf = {
'spark.jars.packages': 'org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-core_2.12:2.4.0',
'spark.sql.shuffle.partitions': '1000',
'spark.sql.extensions': 'io.delta.sql.DeltaSparkSessionExtension',
'spark.sql.catalog.spark_catalog': 'org.apache.spark.sql.delta.catalog.DeltaCatalog',
Expand Down Expand Up @@ -144,15 +139,17 @@ class Env:
is_delete_operator_pod=False
)

etl_index_config = etl_base_config \
etl_deps_config = etl_base_config.add_packages('org.apache.hadoop:hadoop-aws:3.3.4','io.delta:delta-core_2.12:2.4.0')

etl_index_config = etl_deps_config \
.add_spark_conf(spark_small_conf, spark_index_conf) \
.with_spark_jar(index_jar)

etl_publish_config = etl_base_config \
etl_publish_config = etl_deps_config \
.add_spark_conf(spark_small_conf, spark_index_conf) \
.with_spark_jar(publish_jar) \
.with_spark_class('bio.ferlab.fhir.etl.PublishTask')

etl_variant_config = etl_base_config \
etl_variant_config = etl_deps_config \
.add_spark_conf(spark_large_conf) \
.with_spark_jar(variant_jar)
Loading

0 comments on commit 93ca2da

Please sign in to comment.