Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing collecton ingest DAG #261

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ data products and STAC metadata for interfaces such as https://github.com/NASA-I

First time setting up the repo:
`git submodule update --init --recursive`

Afterwards:
`git submodule update --recursive --remote`

Expand Down
5 changes: 2 additions & 3 deletions dags/generate_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ def generate_dags():

from pathlib import Path

airflow_vars = Variable.get("aws_dags_variables")
airflow_vars_json = json.loads(airflow_vars)
bucket = airflow_vars_json.get("EVENT_BUCKET")
airflow_vars = Variable.get("aws_dags_variables", default_var={}, deserialize_json=True)
bucket = airflow_vars.get("EVENT_BUCKET")

try:
client = boto3.client("s3")
Expand Down
1 change: 0 additions & 1 deletion dags/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ apache-airflow-providers-postgres==5.2.2
apache-airflow-providers-common-sql==1.2.0
typing-extensions==4.4.0
psycopg2-binary==2.9.5
pypgstac==0.7.4
pyOpenSSL==22.0.0
stac-pydantic
fsspec
Expand Down
80 changes: 37 additions & 43 deletions dags/veda_data_pipeline/groups/collection_group.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import requests
from airflow.models.variable import Variable
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task
from veda_data_pipeline.utils.collection_generation import GenerateCollection
from veda_data_pipeline.utils.submit_stac import submission_handler

generator = GenerateCollection()


def check_collection_exists(endpoint: str, collection_id: str):
"""
Expand All @@ -24,27 +22,7 @@ def check_collection_exists(endpoint: str, collection_id: str):
)


def ingest_collection_task(ti):
"""
Ingest a collection into the STAC catalog

Args:
dataset (Dict[str, Any]): dataset dictionary (JSON)
role_arn (str): role arn for Zarr collection generation
"""
import json
collection = ti.xcom_pull(task_ids='Collection.generate_collection')
airflow_vars = Variable.get("aws_dags_variables")
airflow_vars_json = json.loads(airflow_vars)
cognito_app_secret = airflow_vars_json.get("COGNITO_APP_SECRET")
stac_ingestor_api_url = airflow_vars_json.get("STAC_INGESTOR_API_URL")

return submission_handler(
event=collection,
endpoint="/collections",
cognito_app_secret=cognito_app_secret,
stac_ingestor_api_url=stac_ingestor_api_url
)


# NOTE unused, but useful for item ingests, since collections are a dependency for items
Expand All @@ -60,32 +38,48 @@ def check_collection_exists_task(ti):
)


def generate_collection_task(ti):
import json
config = ti.dag_run.conf
airflow_vars = Variable.get("aws_dags_variables")
airflow_vars_json = json.loads(airflow_vars)
role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN")

# TODO it would be ideal if this also works with complete collections where provided - this would make the collection ingest more re-usable
collection = generator.generate_stac(
dataset_config=config, role_arn=role_arn
)
return collection



group_kwgs = {"group_id": "Collection", "tooltip": "Collection"}


def collection_task_group():
with TaskGroup(**group_kwgs) as collection_task_grp:
generate_collection = PythonOperator(
task_id="generate_collection", python_callable=generate_collection_task
)
ingest_collection = PythonOperator(
task_id="ingest_collection", python_callable=ingest_collection_task
)
generate_collection >> ingest_collection
@task()
def generate_collection_task(ti):

config = ti.dag_run.conf
airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True)
role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN")

# TODO it would be ideal if this also works with complete collections where provided - this would make the collection ingest more re-usable
generator = GenerateCollection()
collection = generator.generate_stac(
dataset_config=config, role_arn=role_arn
)
return collection

@task()
def ingest_collection_task(collection):
"""
Ingest a collection into the STAC catalog

Args:
collection:

"""
airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True)
cognito_app_secret = airflow_vars_json.get("COGNITO_APP_SECRET")
stac_ingestor_api_url = airflow_vars_json.get("STAC_INGESTOR_API_URL")

return submission_handler(
event=collection,
endpoint="/collections",
cognito_app_secret=cognito_app_secret,
stac_ingestor_api_url=stac_ingestor_api_url
)
Comment on lines +48 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's define these task() functions outside of the TaskGroup, so they can be re-used on their own as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^ this is a breaking change for stactools, which come with their own collection generators, but re-use our collection submission task


collection = generate_collection_task()
ingest_collection_task(collection)

return collection_task_grp
32 changes: 32 additions & 0 deletions dags/veda_data_pipeline/groups/discover_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from veda_data_pipeline.utils.s3_discovery import (
s3_discovery_handler, EmptyFileListError
)
from deprecated import deprecated

group_kwgs = {"group_id": "Discover", "tooltip": "Discover"}

Expand Down Expand Up @@ -48,6 +49,36 @@ def discover_from_s3_task(ti=None, event={}, **kwargs):


@task
def get_files_task(payload, ti=None):
"""
Get files from S3 produced by discovery or dataset tasks.
Handles both single payload and multiple payload scenarios.
"""
Comment on lines +52 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀, this is much nicer than what we were doing before

dag_run_id = ti.dag_run.run_id
results = []

# Handle multiple payloads (dataset and items case)
payloads = payload if isinstance(payload, list) else [payload]

for item in payloads:
if isinstance(item, LazyXComAccess): # Dynamic task mapping case
payloads_xcom = item[0].pop("payload", [])
base_payload = item[0]
else:
payloads_xcom = item.pop("payload", [])
base_payload = item

for indx, payload_xcom in enumerate(payloads_xcom):
results.append({
"run_id": f"{dag_run_id}_{uuid.uuid4()}_{indx}",
**base_payload,
"payload": payload_xcom,
})

return results

@task
@deprecated(reason="Please use get_files_task function that hundles both files and dataset files use cases")
def get_files_to_process(payload, ti=None):
"""Get files from S3 produced by the discovery task.
Used as part of both the parallel_run_process_rasters and parallel_run_process_vectors tasks.
Expand All @@ -66,6 +97,7 @@ def get_files_to_process(payload, ti=None):


@task
@deprecated(reason="Please use get_files_task airflow task instead. This will be removed in the new release")
def get_dataset_files_to_process(payload, ti=None):
"""Get files from S3 produced by the dataset task.
This is different from the get_files_to_process task as it produces a combined structure from repeated mappings.
Expand Down
5 changes: 2 additions & 3 deletions dags/veda_data_pipeline/utils/collection_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,13 @@ def create_cog_collection(self, dataset: Dict[str, Any]) -> dict:

# Override the extents if they exists
if spatial_extent := dataset.get("spatial_extent"):
collection_stac["extent"]["spatial"] = {"bbox": [list(spatial_extent.values())]},
collection_stac["extent"]["spatial"] = {"bbox": [list(spatial_extent.values())]}

if temporal_extent := dataset.get("temporal_extent"):
collection_stac["extent"]["temporal"] = {
"interval": [
# most of our data uses the Z suffix for UTC - isoformat() doesn't
[
datetime.fromisoformat(x).astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
x
if x else None
for x in list(temporal_extent.values())
]
Expand Down
4 changes: 2 additions & 2 deletions dags/veda_data_pipeline/utils/submit_stac.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def submission_handler(
cognito_app_secret=None,
stac_ingestor_api_url=None,
context=None,
) -> None:
) -> [Dict[str, Any], None]:
if context is None:
context = {}

Expand All @@ -121,7 +121,7 @@ def submission_handler(
secret_id=cognito_app_secret,
base_url=stac_ingestor_api_url,
)
ingestor.submit(event=stac_item, endpoint=endpoint)
return ingestor.submit(event=stac_item, endpoint=endpoint)


if __name__ == "__main__":
Expand Down
22 changes: 11 additions & 11 deletions dags/veda_data_pipeline/veda_collection_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@
}

template_dag_run_conf = {
"collection": "<collection-id>",
"data_type": "cog",
"description": "<collection-description>",
"is_periodic": "<true|false>",
"license": "<collection-LICENSE>",
"time_density": "<time-density>",
"title": "<collection-title>"
"collection": "<collection-id>",
"data_type": "cog",
"description": "<collection-description>",
"is_periodic": "<true|false>",
"license": "<collection-LICENSE>",
"time_density": "<time-density>",
"title": "<collection-title>",
}

with DAG("veda_collection_pipeline", params=template_dag_run_conf, **dag_args) as dag:
start = EmptyOperator(task_id="start", dag=dag)
end = EmptyOperator(task_id="end", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, dag=dag)

collection_grp = collection_task_group()
end = EmptyOperator(
task_id="end", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, dag=dag
)

start >> collection_grp >> end
start >> collection_task_group() >> end
108 changes: 54 additions & 54 deletions dags/veda_data_pipeline/veda_dataset_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,37 @@
import pendulum
from airflow import DAG
from airflow.models.param import Param
from airflow.decorators import task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator as EmptyOperator
from airflow.models.variable import Variable
import json
from veda_data_pipeline.groups.collection_group import collection_task_group
from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_dataset_files_to_process
from veda_data_pipeline.groups.processing_tasks import submit_to_stac_ingestor_task

dag_doc_md = """
template_dag_run_conf = {
"collection": "<collection-id>",
"data_type": "cog",
"description": "<collection-description>",
"discovery_items": [
{
"bucket": "<bucket-name>",
"datetime_range": "<range>",
"discovery": "s3",
"filename_regex": "<regex>",
"prefix": "<example-prefix/>",
}
],
"is_periodic": "<true|false>",
"license": "<collection-LICENSE>",
"time_density": "<time-density>",
"title": "<collection-title>",
}

dag_doc_md = f"""
### Dataset Pipeline
Generates a collection and triggers the file discovery process
#### Notes
- This DAG can run with the following configuration <br>
```json
{
"collection": "collection-id",
"data_type": "cog",
"description": "collection description",
"discovery_items":
[
{
"bucket": "veda-data-store-staging",
"datetime_range": "year",
"discovery": "s3",
"filename_regex": "^(.*).tif$",
"prefix": "example-prefix/"
}
],
"is_periodic": true,
"license": "collection-LICENSE",
"time_density": "year",
"title": "collection-title"
}
{template_dag_run_conf}
```
"""

Expand All @@ -44,24 +43,6 @@
"tags": ["collection", "discovery"],
}


@task
def extract_discovery_items(**kwargs):
ti = kwargs.get("ti")
discovery_items = ti.dag_run.conf.get("discovery_items")
print(discovery_items)
return discovery_items


@task(max_active_tis_per_dag=3)
def build_stac_task(payload):
from veda_data_pipeline.utils.build_stac.handler import stac_handler
airflow_vars = Variable.get("aws_dags_variables")
airflow_vars_json = json.loads(airflow_vars)
event_bucket = airflow_vars_json.get("EVENT_BUCKET")
return stac_handler(payload_src=payload, bucket_output=event_bucket)


template_dag_run_conf = {
"collection": "<collection-id>",
"data_type": "cog",
Expand All @@ -83,19 +64,38 @@ def build_stac_task(payload):
}

with DAG("veda_dataset_pipeline", params=template_dag_run_conf, **dag_args) as dag:
# ECS dependency variable
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")

start = EmptyOperator(task_id="start", dag=dag)
end = EmptyOperator(task_id="end", dag=dag)

collection_grp = collection_task_group()
discover = discover_from_s3_task.expand(event=extract_discovery_items())
discover.set_upstream(collection_grp) # do not discover until collection exists
get_files = get_dataset_files_to_process(payload=discover)
@task()
def mutate_payload(**kwargs):
ti = kwargs.get("ti")
payload = ti.dag_run.conf.copy()
payloads = list()
if assets := payload.get("assets"):
# remove thumbnail asset if provided in collection config
if "thumbnail" in assets.keys():
assets.pop("thumbnail")
# if thumbnail was only asset, delete assets
if not assets:
payload.pop("assets")
# finally put the mutated assets back in the payload
else:
payload["assets"] = assets
for item in payload.get("discovery_items"):
payloads.append({
**payload,
**item
}
)
ividito marked this conversation as resolved.
Show resolved Hide resolved

return payloads

build_stac = build_stac_task.expand(payload=get_files)
# .output is needed coming from a non-taskflow operator
submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac)

collection_grp.set_upstream(start)
submit_stac.set_downstream(end)
mutated_payloads = start >> collection_task_group() >> mutate_payload()
run_discover_build_and_push = TriggerDagRunOperator.partial(
task_id="trigger_discover_items_dag",
trigger_dag_id="veda_discover",
wait_for_completion=True
).expand(conf=mutated_payloads) >> end
Comment on lines +97 to +101
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that this promotes reuseability at the level of the discover DAG, but I worry that this creates too much of a breakdown in reuse and observability at the task level.

  • Rather than passing task-specific data structures as parameters, we end up relying on ti.dag_run.conf. This means that, unless we always build and ingest items through the discover DAG, we cannot rely on the data format being the same for different DAGs with different ingestion strategies. This will eventually bring us back to the same solution we have now, where new DAGs skip using the discover DAG, and simply reuse the tasks with additional wrapper steps to manipulate the input. We should try to lean into this - the changes you made to get_files() is a great example, where we promote modularity and reuse at the task level, so that it doesn't matter what the incoming dag_run.conf is.
  • Similarly, this condenses task status into a single node in the dataset DAG. Meanwhile, in the triggered discover DAG, there are several expanded steps with unique failure conditions. This means that a failure in one step in one triggered DAG will require a retry of the complete DAG, rather than a single task.
  • TriggerDagRunOperator adds a link between DAGs, but not between executions. I was hoping this would work better with expand(), but unfortunately we're out of luck. This (and a few similar issues) is being tracked as a bug in Airflow, but until it's fixed, I think we should steer clear so as to maintain execution-level observability, especially in DAG runs with a large number of mapped discovery tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that traceability will be necessary for debugging any issues. Some have reported that this problem is fixed in Airflow 2.10. However, we still need to promote task reusability to avoid defining the same logic twice. I will close this PR and open a new one with some refactoring to make debugging and development easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing this in favor of #268

Loading
Loading