Skip to content

Commit

Permalink
Refactor Dataset Pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
amarouane-ABDELHAK committed Dec 6, 2024
1 parent 53416d0 commit 5a41003
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 34 deletions.
2 changes: 1 addition & 1 deletion dags/veda_data_pipeline/groups/processing_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def submit_to_stac_ingestor_task(built_stac: dict):
)
return event


@task(max_active_tis_per_dag=5)
def build_stac_task(payload):
from veda_data_pipeline.utils.build_stac.handler import stac_handler
airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True)
event_bucket = airflow_vars_json.get("EVENT_BUCKET")
return stac_handler(payload_src=payload, bucket_output=event_bucket)

43 changes: 13 additions & 30 deletions dags/veda_data_pipeline/veda_dataset_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@
"collection": "<collection-id>",
"data_type": "cog",
"description": "<collection-description>",
"discovery_items": [
{
"bucket": "<bucket-name>",
"datetime_range": "<range>",
"discovery": "s3",
"filename_regex": "<regex>",
"prefix": "<example-prefix/>",
}
],
"is_periodic": "<true|false>",
"discovery_items":
[
{
"bucket": "<bucket-name>",
"datetime_range": "<range>",
"discovery": "s3",
"filename_regex": "<regex>",
"prefix": "<example-prefix/>"
}
],
"is_periodic": Param(True, type="boolean"),
"license": "<collection-LICENSE>",
"time_density": "<time-density>",
"title": "<collection-title>",
"title": "<collection-title>"
}

dag_doc_md = f"""
Expand All @@ -44,25 +45,7 @@
"tags": ["collection", "discovery"],
}

template_dag_run_conf = {
"collection": "<collection-id>",
"data_type": "cog",
"description": "<collection-description>",
"discovery_items":
[
{
"bucket": "<bucket-name>",
"datetime_range": "<range>",
"discovery": "s3",
"filename_regex": "<regex>",
"prefix": "<example-prefix/>"
}
],
"is_periodic": "<true|false>",
"license": "<collection-LICENSE>",
"time_density": "<time-density>",
"title": "<collection-title>"
}


with DAG("veda_dataset_pipeline", params=template_dag_run_conf, **dag_args) as dag:
start = EmptyOperator(task_id="start")
Expand Down
3 changes: 0 additions & 3 deletions dags/veda_data_pipeline/veda_discover_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import pendulum
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.decorators import task
from airflow.models.variable import Variable
import json
from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_task
from veda_data_pipeline.groups.processing_tasks import submit_to_stac_ingestor_task, build_stac_task

Expand Down

0 comments on commit 5a41003

Please sign in to comment.