Skip to content

Commit

Permalink
fix: CQDG-759 remove arranger project update and refactor publish taks
Browse files Browse the repository at this point in the history
  • Loading branch information
adipaul1981 committed Jul 29, 2024
1 parent 4f739d7 commit 9cbfc3c
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 100 deletions.
1 change: 0 additions & 1 deletion .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ ID_SERVICE_URL=http://id-service:5000
OBO_PARSER_IMAGE=index.docker.io/ferlabcrsj/obo-parser
ETL_IMAGE=index.docker.io/ferlabcrsj/etl-cqdg-portal
FHAVRO_EXPORT_IMAGE=index.docker.io/ferlabcrsj/fhavro-export
ARRANGER_IMAGE=index.docker.io/ferlabcrsj/cqdg-api-arranger
FHIR_IMPORT_IMAGE=index.docker.io/ferlabcrsj/cqdg-fhir-import
FERLOAD_DRS_IMPORT_IMAGE=index.docker.io/ferlabcrsj/cqdg-ferload-drs-import
ENVIRONMENT=qa
36 changes: 0 additions & 36 deletions dags/arranger.py

This file was deleted.

22 changes: 6 additions & 16 deletions dags/etl.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from datetime import datetime

from airflow import DAG
from airflow.models.param import Param
from airflow.utils.task_group import TaskGroup
from datetime import datetime
from lib.config import env, study_codes, release_id, es_port, es_url, etl_publish_config, project

from es_templates_update import es_templates_update
from etl_import import etl_import
from arranger import arranger_task
from etl_fhavro_export import fhavro_export
from etl_import import etl_import
from etl_index import index_operator
from etl_prepare_index import prepare_index
from etl_publish import publish_task

with DAG(
dag_id='etl',
Expand All @@ -24,16 +25,5 @@

with TaskGroup(group_id='index') as index:
index_operator('study') >> index_operator('participant') >> index_operator('file') >> index_operator('biospecimen')

with TaskGroup(group_id='publish') as publish:
def publish_operator(name:str):
return etl_publish_config \
.args(es_url, es_port, env, project, release_id, study_codes, f'{name}_centric') \
.operator(
task_id=f'{name}_centric',
name=f'etl-publish-{name}-centric'
)

publish_operator('study') >> publish_operator('participant') >> publish_operator('file') >> publish_operator('biospecimen')

fhavro_export() >> etl_import() >> prepare_index() >> es_templates_update() >> index >> publish >> arranger_task()
fhavro_export() >> etl_import() >> prepare_index() >> es_templates_update() >> index >> publish_task('study_centric,participant_centric,file_centric,biospecimen_centric')
42 changes: 42 additions & 0 deletions dags/etl_publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from airflow import DAG
from airflow.models import Param, Variable
from datetime import datetime
from lib.config import kube_config, es_url, es_port, es_credentials_secret_name, \
es_credentials_secret_key_password, es_credentials_secret_key_username, release_id, study_codes
from lib.operators.publish import PublishConfig

etl_publish_config = PublishConfig(
es_url = es_url,
kube_config = kube_config,
image = Variable.get('publish_image'),
es_port = es_port,
es_cert_secret_name = 'opensearch-ca-certificate',
es_credentials_secret_name = es_credentials_secret_name,
es_credentials_secret_key_username = es_credentials_secret_key_username,
es_credentials_secret_key_password = es_credentials_secret_key_password,
)

def publish_task(job_types: str):
return etl_publish_config.args(
es_url,
es_port,
release_id,
study_codes,
job_types) \
.operator(
task_id='etl_publish',
name='etl-publish',
)
with DAG(
dag_id='etl-publish',
start_date=datetime(2022, 1, 1),
schedule_interval=None,
params={
'es_port': Param('9200', type='string'),
'release_id': Param('0', type='string'),
'study_codes': Param('study1', type='string'),
'job_types': Param('study_centric,participant_centric,file_centric,biospecimen_centric', type='string'),
},
) as dag:
publish_task('{{ params.job_types }}')

5 changes: 0 additions & 5 deletions dags/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,6 @@ class Env:
.add_spark_conf(spark_small_conf, spark_index_conf) \
.with_spark_jar(index_jar)

etl_publish_config = etl_base_config \
.add_spark_conf(spark_small_conf, spark_index_conf) \
.with_spark_jar(publish_jar) \
.with_spark_class('bio.ferlab.fhir.etl.PublishTask')

etl_variant_config = etl_base_config \
.add_spark_conf(spark_large_conf) \
.with_spark_jar(variant_jar)
52 changes: 11 additions & 41 deletions dags/lib/operators/arranger.py → dags/lib/operators/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,31 @@
from lib.operators.base_kubernetes import BaseKubernetesOperator, BaseConfig, required


class ArrangerOperator(BaseKubernetesOperator):
class PublishOperator(BaseKubernetesOperator):
def __init__(
self,
node_environment: str,
es_url: str,
es_port: Optional[str] = '9200',
es_cert_secret_name: Optional[str] = None,
es_cert_file: Optional[str] = 'ca.crt',
es_credentials_secret_name: Optional[str] = None,
es_credentials_secret_key_username: Optional[str] = 'username',
es_credentials_secret_key_password: Optional[str] = 'password',
keycloak_client_secret_name: Optional[str] = None,
keycloak_client_secret_key: Optional[str] = 'client-secret',
**kwargs,
) -> None:
super().__init__(
**kwargs
)
self.node_environment = node_environment
self.es_url = es_url
self.es_port = es_port
self.es_cert_secret_name = es_cert_secret_name
self.es_cert_file = es_cert_file
self.es_credentials_secret_name = es_credentials_secret_name
self.es_credentials_secret_key_username = es_credentials_secret_key_username
self.es_credentials_secret_key_password = es_credentials_secret_key_password
self.keycloak_client_secret_name = keycloak_client_secret_name
self.keycloak_client_secret_key = keycloak_client_secret_key

def execute(self, **kwargs):

self.env_vars.append(
k8s.V1EnvVar(
name='NODE_ENV',
value=self.node_environment,
)
)
self.env_vars.append(
k8s.V1EnvVar(
name='ES_HOST',
Expand All @@ -51,7 +39,7 @@ def execute(self, **kwargs):
if self.es_credentials_secret_name:
self.env_vars.append(
k8s.V1EnvVar(
name='ES_USER',
name='ES_USERNAME',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name=self.es_credentials_secret_name,
Expand All @@ -61,7 +49,7 @@ def execute(self, **kwargs):
)
self.env_vars.append(
k8s.V1EnvVar(
name='ES_PASS',
name='ES_PASSWORD',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name=self.es_credentials_secret_name,
Expand All @@ -70,19 +58,6 @@ def execute(self, **kwargs):
)
)

if self.keycloak_client_secret_name:
self.env_vars.append(
k8s.V1EnvVar(
name='KEYCLOAK_CLIENT_SECRET',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name=self.keycloak_client_secret_name,
key=self.keycloak_client_secret_key,
),
),
)
)

if self.es_cert_secret_name:
self.volumes.append(
k8s.V1Volume(
Expand All @@ -100,30 +75,25 @@ def execute(self, **kwargs):
read_only=True,
),
)
self.env_vars.append(
k8s.V1EnvVar(
name='NODE_EXTRA_CA_CERTS',
value=f'/opt/opensearch-ca/{self.es_cert_file}',
)
)

self.cmds = ['node']
self.cmds = ['java',
'-cp',
'publish-task.jar',
'bio.ferlab.fhir.etl.PublishTask'
]

super().execute(**kwargs)


@dataclass
class ArrangerConfig(BaseConfig):
node_environment: str = required() # we need a default value because BaseConfig has some default fields. See
# https://stackoverflow.com/questions/51575931/class-inheritance-in-python-3-7-dataclasses
class PublishConfig(BaseConfig):
es_url: Optional[str] = required()
es_port: Optional[str] = '9200'
es_cert_secret_name: Optional[str] = None
es_cert_file: Optional[str] = 'ca.crt'
es_credentials_secret_name: Optional[str] = None
es_credentials_secret_key_username: Optional[str] = 'username'
es_credentials_secret_key_password: Optional[str] = 'password'
keycloak_client_secret_name: Optional[str] = None
keycloak_client_secret_key: Optional[str] = 'client-secret'

def operator(self, class_to_instantiate: Type[ArrangerOperator] = ArrangerOperator, **kwargs) -> ArrangerOperator:
def operator(self, class_to_instantiate: Type[PublishOperator] = PublishOperator, **kwargs) -> PublishOperator:
return super().build_operator(class_to_instantiate=class_to_instantiate, **kwargs)
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ x-airflow-common:
AIRFLOW_VAR_OBO_PARSER_IMAGE: ${OBO_PARSER_IMAGE}
AIRFLOW_VAR_ETL_IMAGE: ${ETL_IMAGE}
AIRFLOW_VAR_FHAVRO_EXPORT_IMAGE: ${FHAVRO_EXPORT_IMAGE}
AIRFLOW_VAR_ARRANGER_IMAGE: ${ARRANGER_IMAGE}
AIRFLOW_VAR_PUBLISH_IMAGE: ${PUBLISH_IMAGE}
AIRFLOW_VAR_FHIR_IMPORT_IMAGE: ${FHIR_IMPORT_IMAGE}
AIRFLOW_VAR_FERLOAD_DRS_IMPORT_IMAGE: ${FERLOAD_DRS_IMPORT_IMAGE}
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
Expand Down

0 comments on commit 9cbfc3c

Please sign in to comment.