-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing collecton ingest DAG #261
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there additional context to this PR? I didn't know we had issues with collection ingests.
run_discover_build_and_push = TriggerMultiDagRunOperator( | ||
task_id="trigger_discover_items_dag", | ||
dag=dag, | ||
trigger_dag_id="veda_discover", | ||
python_callable=trigger_discover_and_build_task, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not use this operator, as it creates a disconnect between the original DAGRun event and subsequent DAG runs (ie. a failure in an instance veda-discover will not feed back to the original veda-dataset DAG using this operator). We could instead describe the discover pipeline in a TaskGroup and call expand()
on it, which will map task instances to the correct event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to re-use discover_build_ingest
DAG instead of redefining the same tasks. But having a task group used in both DAGs is not a bad idea
sm2a/airflow_worker/Dockerfile
Outdated
@@ -41,7 +41,9 @@ COPY --chown=airflow:airflow scripts "${AIRFLOW_HOME}/scripts" | |||
|
|||
RUN cp ${AIRFLOW_HOME}/configuration/airflow.cfg* ${AIRFLOW_HOME}/. | |||
|
|||
RUN pip install pypgstac==0.7.4 | |||
# Commited because it downgrade pydentics to v1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an artifact and we should not need pypgstac in airflow--that should be entirely handled by the ingest api
fix: remove pypgstac install
…s in collection generation dag
fix: remove python version dependent datetime formatter in collection generation
Collection generation print statement
Remove comma in collection generation
@task() | ||
def generate_collection_task(ti): | ||
|
||
config = ti.dag_run.conf | ||
airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True) | ||
role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN") | ||
|
||
# TODO it would be ideal if this also works with complete collections where provided - this would make the collection ingest more re-usable | ||
generator = GenerateCollection() | ||
collection = generator.generate_stac( | ||
dataset_config=config, role_arn=role_arn | ||
) | ||
return collection | ||
|
||
@task() | ||
def ingest_collection_task(collection): | ||
""" | ||
Ingest a collection into the STAC catalog | ||
|
||
Args: | ||
collection: | ||
|
||
""" | ||
airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True) | ||
cognito_app_secret = airflow_vars_json.get("COGNITO_APP_SECRET") | ||
stac_ingestor_api_url = airflow_vars_json.get("STAC_INGESTOR_API_URL") | ||
|
||
return submission_handler( | ||
event=collection, | ||
endpoint="/collections", | ||
cognito_app_secret=cognito_app_secret, | ||
stac_ingestor_api_url=stac_ingestor_api_url | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's define these task()
functions outside of the TaskGroup, so they can be re-used on their own as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^^ this is a breaking change for stactools
, which come with their own collection generators, but re-use our collection submission task
run_discover_build_and_push = TriggerDagRunOperator.partial( | ||
task_id="trigger_discover_items_dag", | ||
trigger_dag_id="veda_discover", | ||
wait_for_completion=True, | ||
|
||
collection_grp.set_upstream(start) | ||
submit_stac.set_downstream(end) | ||
).expand(conf=items) >> end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this will allow the parent DAG to properly track the state of mapped+triggered DAGs, but still recreates a disconnect (the link created does not associate DAG runs with a parent DAG, it only associates the DAG itself with its parent).
Co-authored-by: Alexandra Kirk <[email protected]>
def get_files_task(payload, ti=None): | ||
""" | ||
Get files from S3 produced by discovery or dataset tasks. | ||
Handles both single payload and multiple payload scenarios. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀, this is much nicer than what we were doing before
@task() | ||
def generate_collection_task(ti): | ||
|
||
config = ti.dag_run.conf | ||
airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True) | ||
role_arn = airflow_vars_json.get("ASSUME_ROLE_READ_ARN") | ||
|
||
# TODO it would be ideal if this also works with complete collections where provided - this would make the collection ingest more re-usable | ||
generator = GenerateCollection() | ||
collection = generator.generate_stac( | ||
dataset_config=config, role_arn=role_arn | ||
) | ||
return collection | ||
|
||
@task() | ||
def ingest_collection_task(collection): | ||
""" | ||
Ingest a collection into the STAC catalog | ||
|
||
Args: | ||
collection: | ||
|
||
""" | ||
airflow_vars_json = Variable.get("aws_dags_variables", deserialize_json=True) | ||
cognito_app_secret = airflow_vars_json.get("COGNITO_APP_SECRET") | ||
stac_ingestor_api_url = airflow_vars_json.get("STAC_INGESTOR_API_URL") | ||
|
||
return submission_handler( | ||
event=collection, | ||
endpoint="/collections", | ||
cognito_app_secret=cognito_app_secret, | ||
stac_ingestor_api_url=stac_ingestor_api_url | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^^ this is a breaking change for stactools
, which come with their own collection generators, but re-use our collection submission task
run_discover_build_and_push = TriggerDagRunOperator.partial( | ||
task_id="trigger_discover_items_dag", | ||
trigger_dag_id="veda_discover", | ||
wait_for_completion=True | ||
).expand(conf=mutated_payloads) >> end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get that this promotes reuseability at the level of the discover
DAG, but I worry that this creates too much of a breakdown in reuse and observability at the task level.
- Rather than passing task-specific data structures as parameters, we end up relying on
ti.dag_run.conf
. This means that, unless we always build and ingest items through the discover DAG, we cannot rely on the data format being the same for different DAGs with different ingestion strategies. This will eventually bring us back to the same solution we have now, where new DAGs skip using thediscover
DAG, and simply reuse the tasks with additional wrapper steps to manipulate the input. We should try to lean into this - the changes you made toget_files()
is a great example, where we promote modularity and reuse at the task level, so that it doesn't matter what the incomingdag_run.conf
is. - Similarly, this condenses task status into a single node in the
dataset
DAG. Meanwhile, in the triggereddiscover
DAG, there are several expanded steps with unique failure conditions. This means that a failure in one step in one triggered DAG will require a retry of the complete DAG, rather than a single task. TriggerDagRunOperator
adds a link between DAGs, but not between executions. I was hoping this would work better withexpand()
, but unfortunately we're out of luck. This (and a few similar issues) is being tracked as a bug in Airflow, but until it's fixed, I think we should steer clear so as to maintain execution-level observability, especially in DAG runs with a large number of mapped discovery tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree that traceability will be necessary for debugging any issues. Some have reported that this problem is fixed in Airflow 2.10. However, we still need to promote task reusability to avoid defining the same logic twice. I will close this PR and open a new one with some refactoring to make debugging and development easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closing this in favor of #268
Summary: Summary of changes
Fixing collection ingest
Changes