-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing collecton ingest DAG #261
Changes from 18 commits
0be8c4c
fe09938
56738e9
66193a8
4fbd229
d9799cc
926be29
7e05d2f
2974701
5d8757e
948aaec
64683af
9d7c611
3a48975
0e5cc7c
0556329
9024bb8
195f659
ff1aa2a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
from veda_data_pipeline.utils.s3_discovery import ( | ||
s3_discovery_handler, EmptyFileListError | ||
) | ||
from deprecated import deprecated | ||
|
||
group_kwgs = {"group_id": "Discover", "tooltip": "Discover"} | ||
|
||
|
@@ -48,6 +49,36 @@ def discover_from_s3_task(ti=None, event={}, **kwargs): | |
|
||
|
||
@task | ||
def get_files_task(payload, ti=None): | ||
""" | ||
Get files from S3 produced by discovery or dataset tasks. | ||
Handles both single payload and multiple payload scenarios. | ||
""" | ||
Comment on lines
+52
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚀, this is much nicer than what we were doing before |
||
dag_run_id = ti.dag_run.run_id | ||
results = [] | ||
|
||
# Handle multiple payloads (dataset and items case) | ||
payloads = payload if isinstance(payload, list) else [payload] | ||
|
||
for item in payloads: | ||
if isinstance(item, LazyXComAccess): # Dynamic task mapping case | ||
payloads_xcom = item[0].pop("payload", []) | ||
base_payload = item[0] | ||
else: | ||
payloads_xcom = item.pop("payload", []) | ||
base_payload = item | ||
|
||
for indx, payload_xcom in enumerate(payloads_xcom): | ||
results.append({ | ||
"run_id": f"{dag_run_id}_{uuid.uuid4()}_{indx}", | ||
**base_payload, | ||
"payload": payload_xcom, | ||
}) | ||
|
||
return results | ||
|
||
@task | ||
@deprecated(reason="Please use get_files_task function that hundles both files and dataset files use cases") | ||
def get_files_to_process(payload, ti=None): | ||
"""Get files from S3 produced by the discovery task. | ||
Used as part of both the parallel_run_process_rasters and parallel_run_process_vectors tasks. | ||
|
@@ -66,6 +97,7 @@ def get_files_to_process(payload, ti=None): | |
|
||
|
||
@task | ||
@deprecated(reason="Please use get_files_task airflow task instead. This will be removed in the new release") | ||
def get_dataset_files_to_process(payload, ti=None): | ||
"""Get files from S3 produced by the dataset task. | ||
This is different from the get_files_to_process task as it produces a combined structure from repeated mappings. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,38 +1,37 @@ | ||
import pendulum | ||
from airflow import DAG | ||
from airflow.models.param import Param | ||
from airflow.decorators import task | ||
from airflow.operators.trigger_dagrun import TriggerDagRunOperator | ||
from airflow.operators.dummy_operator import DummyOperator as EmptyOperator | ||
from airflow.models.variable import Variable | ||
import json | ||
from veda_data_pipeline.groups.collection_group import collection_task_group | ||
from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_dataset_files_to_process | ||
from veda_data_pipeline.groups.processing_tasks import submit_to_stac_ingestor_task | ||
|
||
dag_doc_md = """ | ||
template_dag_run_conf = { | ||
"collection": "<collection-id>", | ||
"data_type": "cog", | ||
"description": "<collection-description>", | ||
"discovery_items": [ | ||
{ | ||
"bucket": "<bucket-name>", | ||
"datetime_range": "<range>", | ||
"discovery": "s3", | ||
"filename_regex": "<regex>", | ||
"prefix": "<example-prefix/>", | ||
} | ||
], | ||
"is_periodic": "<true|false>", | ||
"license": "<collection-LICENSE>", | ||
"time_density": "<time-density>", | ||
"title": "<collection-title>", | ||
} | ||
|
||
dag_doc_md = f""" | ||
### Dataset Pipeline | ||
Generates a collection and triggers the file discovery process | ||
#### Notes | ||
- This DAG can run with the following configuration <br> | ||
```json | ||
{ | ||
"collection": "collection-id", | ||
"data_type": "cog", | ||
"description": "collection description", | ||
"discovery_items": | ||
[ | ||
{ | ||
"bucket": "veda-data-store-staging", | ||
"datetime_range": "year", | ||
"discovery": "s3", | ||
"filename_regex": "^(.*).tif$", | ||
"prefix": "example-prefix/" | ||
} | ||
], | ||
"is_periodic": true, | ||
"license": "collection-LICENSE", | ||
"time_density": "year", | ||
"title": "collection-title" | ||
} | ||
{template_dag_run_conf} | ||
``` | ||
""" | ||
|
||
|
@@ -44,24 +43,6 @@ | |
"tags": ["collection", "discovery"], | ||
} | ||
|
||
|
||
@task | ||
def extract_discovery_items(**kwargs): | ||
ti = kwargs.get("ti") | ||
discovery_items = ti.dag_run.conf.get("discovery_items") | ||
print(discovery_items) | ||
return discovery_items | ||
|
||
|
||
@task(max_active_tis_per_dag=3) | ||
def build_stac_task(payload): | ||
from veda_data_pipeline.utils.build_stac.handler import stac_handler | ||
airflow_vars = Variable.get("aws_dags_variables") | ||
airflow_vars_json = json.loads(airflow_vars) | ||
event_bucket = airflow_vars_json.get("EVENT_BUCKET") | ||
return stac_handler(payload_src=payload, bucket_output=event_bucket) | ||
|
||
|
||
template_dag_run_conf = { | ||
"collection": "<collection-id>", | ||
"data_type": "cog", | ||
|
@@ -83,19 +64,38 @@ def build_stac_task(payload): | |
} | ||
|
||
with DAG("veda_dataset_pipeline", params=template_dag_run_conf, **dag_args) as dag: | ||
# ECS dependency variable | ||
start = EmptyOperator(task_id="start") | ||
end = EmptyOperator(task_id="end") | ||
|
||
start = EmptyOperator(task_id="start", dag=dag) | ||
end = EmptyOperator(task_id="end", dag=dag) | ||
|
||
collection_grp = collection_task_group() | ||
discover = discover_from_s3_task.expand(event=extract_discovery_items()) | ||
discover.set_upstream(collection_grp) # do not discover until collection exists | ||
get_files = get_dataset_files_to_process(payload=discover) | ||
@task() | ||
def mutate_payload(**kwargs): | ||
ti = kwargs.get("ti") | ||
payload = ti.dag_run.conf.copy() | ||
payloads = list() | ||
if assets := payload.get("assets"): | ||
# remove thumbnail asset if provided in collection config | ||
if "thumbnail" in assets.keys(): | ||
assets.pop("thumbnail") | ||
# if thumbnail was only asset, delete assets | ||
if not assets: | ||
payload.pop("assets") | ||
# finally put the mutated assets back in the payload | ||
else: | ||
payload["assets"] = assets | ||
for item in payload.get("discovery_items"): | ||
payloads.append({ | ||
**payload, | ||
**item | ||
} | ||
) | ||
ividito marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return payloads | ||
|
||
build_stac = build_stac_task.expand(payload=get_files) | ||
# .output is needed coming from a non-taskflow operator | ||
submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac) | ||
|
||
collection_grp.set_upstream(start) | ||
submit_stac.set_downstream(end) | ||
mutated_payloads = start >> collection_task_group() >> mutate_payload() | ||
run_discover_build_and_push = TriggerDagRunOperator.partial( | ||
task_id="trigger_discover_items_dag", | ||
trigger_dag_id="veda_discover", | ||
wait_for_completion=True | ||
).expand(conf=mutated_payloads) >> end | ||
Comment on lines
+97
to
+101
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I get that this promotes reuseability at the level of the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I agree that traceability will be necessary for debugging any issues. Some have reported that this problem is fixed in Airflow 2.10. However, we still need to promote task reusability to avoid defining the same logic twice. I will close this PR and open a new one with some refactoring to make debugging and development easier. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Closing this in favor of #268 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's define these
task()
functions outside of the TaskGroup, so they can be re-used on their own as well.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^^ this is a breaking change for
stactools
, which come with their own collection generators, but re-use our collection submission task