diff --git a/dags/veda_data_pipeline/groups/processing_tasks.py b/dags/veda_data_pipeline/groups/processing_tasks.py index b940bdc..2dc48a4 100644 --- a/dags/veda_data_pipeline/groups/processing_tasks.py +++ b/dags/veda_data_pipeline/groups/processing_tasks.py @@ -35,10 +35,10 @@ def submit_to_stac_ingestor_task(built_stac: dict): ) return event + @task(max_active_tis_per_dag=5) def build_stac_task(payload): from veda_data_pipeline.utils.build_stac.handler import stac_handler airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True) event_bucket = airflow_vars_json.get("EVENT_BUCKET") return stac_handler(payload_src=payload, bucket_output=event_bucket) - diff --git a/dags/veda_data_pipeline/veda_dataset_pipeline.py b/dags/veda_data_pipeline/veda_dataset_pipeline.py index 78a38a0..5302c27 100644 --- a/dags/veda_data_pipeline/veda_dataset_pipeline.py +++ b/dags/veda_data_pipeline/veda_dataset_pipeline.py @@ -11,19 +11,20 @@ "collection": "", "data_type": "cog", "description": "", - "discovery_items": [ - { - "bucket": "", - "datetime_range": "", - "discovery": "s3", - "filename_regex": "", - "prefix": "", - } - ], - "is_periodic": "", + "discovery_items": + [ + { + "bucket": "", + "datetime_range": "", + "discovery": "s3", + "filename_regex": "", + "prefix": "" + } + ], + "is_periodic": Param(True, type="boolean"), "license": "", "time_density": "", - "title": "", + "title": "" } dag_doc_md = f""" @@ -44,25 +45,7 @@ "tags": ["collection", "discovery"], } -template_dag_run_conf = { - "collection": "", - "data_type": "cog", - "description": "", - "discovery_items": - [ - { - "bucket": "", - "datetime_range": "", - "discovery": "s3", - "filename_regex": "", - "prefix": "" - } - ], - "is_periodic": "", - "license": "", - "time_density": "", - "title": "" -} + with DAG("veda_dataset_pipeline", params=template_dag_run_conf, **dag_args) as dag: start = EmptyOperator(task_id="start") diff --git a/dags/veda_data_pipeline/veda_discover_pipeline.py b/dags/veda_data_pipeline/veda_discover_pipeline.py index e7af4c7..75df761 100644 --- a/dags/veda_data_pipeline/veda_discover_pipeline.py +++ b/dags/veda_data_pipeline/veda_discover_pipeline.py @@ -1,9 +1,6 @@ import pendulum from airflow import DAG from airflow.operators.dummy_operator import DummyOperator -from airflow.decorators import task -from airflow.models.variable import Variable -import json from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_task from veda_data_pipeline.groups.processing_tasks import submit_to_stac_ingestor_task, build_stac_task