Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing collecton ingest DAG #261

Closed
wants to merge 19 commits into from
Closed

Conversation

amarouane-ABDELHAK
Copy link
Contributor

Summary: Summary of changes

Fixing collection ingest

Changes

  • Ingest Collection DAG

Copy link
Contributor

@ividito ividito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there additional context to this PR? I didn't know we had issues with collection ingests.

Comment on lines 56 to 61
run_discover_build_and_push = TriggerMultiDagRunOperator(
task_id="trigger_discover_items_dag",
dag=dag,
trigger_dag_id="veda_discover",
python_callable=trigger_discover_and_build_task,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not use this operator, as it creates a disconnect between the original DAGRun event and subsequent DAG runs (ie. a failure in an instance veda-discover will not feed back to the original veda-dataset DAG using this operator). We could instead describe the discover pipeline in a TaskGroup and call expand() on it, which will map task instances to the correct event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to re-use discover_build_ingest DAG instead of redefining the same tasks. But having a task group used in both DAGs is not a bad idea

@@ -41,7 +41,9 @@ COPY --chown=airflow:airflow scripts "${AIRFLOW_HOME}/scripts"

RUN cp ${AIRFLOW_HOME}/configuration/airflow.cfg* ${AIRFLOW_HOME}/.

RUN pip install pypgstac==0.7.4
# Commited because it downgrade pydentics to v1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an artifact and we should not need pypgstac in airflow--that should be entirely handled by the ingest api

Comment on lines +48 to +80
@task()
def generate_collection_task(ti):

config = ti.dag_run.conf
airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True)
role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN")

# TODO it would be ideal if this also works with complete collections where provided - this would make the collection ingest more re-usable
generator = GenerateCollection()
collection = generator.generate_stac(
dataset_config=config, role_arn=role_arn
)
return collection

@task()
def ingest_collection_task(collection):
"""
Ingest a collection into the STAC catalog

Args:
collection:

"""
airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True)
cognito_app_secret = airflow_vars_json.get("COGNITO_APP_SECRET")
stac_ingestor_api_url = airflow_vars_json.get("STAC_INGESTOR_API_URL")

return submission_handler(
event=collection,
endpoint="/collections",
cognito_app_secret=cognito_app_secret,
stac_ingestor_api_url=stac_ingestor_api_url
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's define these task() functions outside of the TaskGroup, so they can be re-used on their own as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^ this is a breaking change for stactools, which come with their own collection generators, but re-use our collection submission task

Comment on lines 58 to 63
run_discover_build_and_push = TriggerDagRunOperator.partial(
task_id="trigger_discover_items_dag",
trigger_dag_id="veda_discover",
wait_for_completion=True,

collection_grp.set_upstream(start)
submit_stac.set_downstream(end)
).expand(conf=items) >> end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this will allow the parent DAG to properly track the state of mapped+triggered DAGs, but still recreates a disconnect (the link created does not associate DAG runs with a parent DAG, it only associates the DAG itself with its parent).

Comment on lines +52 to +56
def get_files_task(payload, ti=None):
"""
Get files from S3 produced by discovery or dataset tasks.
Handles both single payload and multiple payload scenarios.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚀, this is much nicer than what we were doing before

dags/veda_data_pipeline/veda_discover_pipeline.py Outdated Show resolved Hide resolved
Comment on lines +48 to +80
@task()
def generate_collection_task(ti):

config = ti.dag_run.conf
airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True)
role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN")

# TODO it would be ideal if this also works with complete collections where provided - this would make the collection ingest more re-usable
generator = GenerateCollection()
collection = generator.generate_stac(
dataset_config=config, role_arn=role_arn
)
return collection

@task()
def ingest_collection_task(collection):
"""
Ingest a collection into the STAC catalog

Args:
collection:

"""
airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True)
cognito_app_secret = airflow_vars_json.get("COGNITO_APP_SECRET")
stac_ingestor_api_url = airflow_vars_json.get("STAC_INGESTOR_API_URL")

return submission_handler(
event=collection,
endpoint="/collections",
cognito_app_secret=cognito_app_secret,
stac_ingestor_api_url=stac_ingestor_api_url
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^ this is a breaking change for stactools, which come with their own collection generators, but re-use our collection submission task

Comment on lines +97 to +101
run_discover_build_and_push = TriggerDagRunOperator.partial(
task_id="trigger_discover_items_dag",
trigger_dag_id="veda_discover",
wait_for_completion=True
).expand(conf=mutated_payloads) >> end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that this promotes reuseability at the level of the discover DAG, but I worry that this creates too much of a breakdown in reuse and observability at the task level.

  • Rather than passing task-specific data structures as parameters, we end up relying on ti.dag_run.conf. This means that, unless we always build and ingest items through the discover DAG, we cannot rely on the data format being the same for different DAGs with different ingestion strategies. This will eventually bring us back to the same solution we have now, where new DAGs skip using the discover DAG, and simply reuse the tasks with additional wrapper steps to manipulate the input. We should try to lean into this - the changes you made to get_files() is a great example, where we promote modularity and reuse at the task level, so that it doesn't matter what the incoming dag_run.conf is.
  • Similarly, this condenses task status into a single node in the dataset DAG. Meanwhile, in the triggered discover DAG, there are several expanded steps with unique failure conditions. This means that a failure in one step in one triggered DAG will require a retry of the complete DAG, rather than a single task.
  • TriggerDagRunOperator adds a link between DAGs, but not between executions. I was hoping this would work better with expand(), but unfortunately we're out of luck. This (and a few similar issues) is being tracked as a bug in Airflow, but until it's fixed, I think we should steer clear so as to maintain execution-level observability, especially in DAG runs with a large number of mapped discovery tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that traceability will be necessary for debugging any issues. Some have reported that this problem is fixed in Airflow 2.10. However, we still need to promote task reusability to avoid defining the same logic twice. I will close this PR and open a new one with some refactoring to make debugging and development easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing this in favor of #268

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants