From 4ba98afd3127ba38262a01d3dc60c9e0f8deb275 Mon Sep 17 00:00:00 2001 From: ividito Date: Wed, 27 Nov 2024 16:59:54 -0400 Subject: [PATCH 1/3] Strip thumbnail assets from payload before discovery --- .../groups/discover_group.py | 18 +++++++++++++----- .../veda_dataset_pipeline.py | 16 +++++++++++++++- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/dags/veda_data_pipeline/groups/discover_group.py b/dags/veda_data_pipeline/groups/discover_group.py index 63f41dd6..6c486f58 100644 --- a/dags/veda_data_pipeline/groups/discover_group.py +++ b/dags/veda_data_pipeline/groups/discover_group.py @@ -13,20 +13,28 @@ @task(retries=1, retry_delay=timedelta(minutes=1)) -def discover_from_s3_task(ti=None, event={}, **kwargs): +def discover_from_s3_task(ti=None, event={}, alt_payload = None, **kwargs): """Discover grouped assets/files from S3 in batches of 2800. Produce a list of such files stored on S3 to process. This task is used as part of the discover_group subdag and outputs data to EVENT_BUCKET. """ - config = { - **event, - **ti.dag_run.conf, - } + if alt_payload: + config = { + **event, + **alt_payload + } + else: + config = { + **event, + **ti.dag_run.conf, + } # TODO test that this context var is available in taskflow last_successful_execution = kwargs.get("prev_start_date_success") if event.get("schedule") and last_successful_execution: config["last_successful_execution"] = last_successful_execution.isoformat() # (event, chunk_size=2800, role_arn=None, bucket_output=None): + if event.get("item_assets") and event.get("assets"): + config["assets"] = event.get("item_assets") airflow_vars = Variable.get("aws_dags_variables") airflow_vars_json = json.loads(airflow_vars) event_bucket = airflow_vars_json.get("EVENT_BUCKET") diff --git a/dags/veda_data_pipeline/veda_dataset_pipeline.py b/dags/veda_data_pipeline/veda_dataset_pipeline.py index 1c8746cb..987f5a27 100644 --- a/dags/veda_data_pipeline/veda_dataset_pipeline.py +++ b/dags/veda_data_pipeline/veda_dataset_pipeline.py @@ -61,6 +61,18 @@ def build_stac_task(payload): event_bucket = airflow_vars_json.get("EVENT_BUCKET") return stac_handler(payload_src=payload, bucket_output=event_bucket) +@task() +def mutate_payload(**kwargs): + ti = kwargs.get("ti") + payload = ti.dag_run.conf + if assets := payload.get("assets"): + # is first key thumbnail + if "thumbnail" in assets.keys(): + assets.pop("thumbnail") + if not assets: + payload.pop("assets") + return payload + template_dag_run_conf = { "collection": "", @@ -89,7 +101,8 @@ def build_stac_task(payload): end = EmptyOperator(task_id="end", dag=dag) collection_grp = collection_task_group() - discover = discover_from_s3_task.expand(event=extract_discovery_items()) + mutate_assets_task = mutate_payload() + discover = discover_from_s3_task.partial(alt_payload=mutate_assets_task()).expand(event=extract_discovery_items()) discover.set_upstream(collection_grp) # do not discover until collection exists get_files = get_dataset_files_to_process(payload=discover) @@ -98,4 +111,5 @@ def build_stac_task(payload): submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac) collection_grp.set_upstream(start) + mutate_assets_task.set_upstream(start) submit_stac.set_downstream(end) From 1c7dd8d93e5905ccc4b7e87a38e54d14ebf7a9a2 Mon Sep 17 00:00:00 2001 From: ividito Date: Wed, 27 Nov 2024 17:15:53 -0400 Subject: [PATCH 2/3] rename var --- dags/veda_data_pipeline/veda_dataset_pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dags/veda_data_pipeline/veda_dataset_pipeline.py b/dags/veda_data_pipeline/veda_dataset_pipeline.py index 987f5a27..a049f32d 100644 --- a/dags/veda_data_pipeline/veda_dataset_pipeline.py +++ b/dags/veda_data_pipeline/veda_dataset_pipeline.py @@ -101,8 +101,8 @@ def mutate_payload(**kwargs): end = EmptyOperator(task_id="end", dag=dag) collection_grp = collection_task_group() - mutate_assets_task = mutate_payload() - discover = discover_from_s3_task.partial(alt_payload=mutate_assets_task()).expand(event=extract_discovery_items()) + mutate_payload_task = mutate_payload() + discover = discover_from_s3_task.partial(alt_payload=mutate_payload_task()).expand(event=extract_discovery_items()) discover.set_upstream(collection_grp) # do not discover until collection exists get_files = get_dataset_files_to_process(payload=discover) @@ -111,5 +111,5 @@ def mutate_payload(**kwargs): submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac) collection_grp.set_upstream(start) - mutate_assets_task.set_upstream(start) + mutate_payload_task.set_upstream(start) submit_stac.set_downstream(end) From d8db95fae1d586277931da33cef81f8a79f363b9 Mon Sep 17 00:00:00 2001 From: Isayah Vidito Date: Wed, 27 Nov 2024 21:22:30 -0400 Subject: [PATCH 3/3] Update dags/veda_data_pipeline/veda_dataset_pipeline.py Co-authored-by: Alexandra Kirk --- dags/veda_data_pipeline/veda_dataset_pipeline.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dags/veda_data_pipeline/veda_dataset_pipeline.py b/dags/veda_data_pipeline/veda_dataset_pipeline.py index a049f32d..2980bfeb 100644 --- a/dags/veda_data_pipeline/veda_dataset_pipeline.py +++ b/dags/veda_data_pipeline/veda_dataset_pipeline.py @@ -66,11 +66,15 @@ def mutate_payload(**kwargs): ti = kwargs.get("ti") payload = ti.dag_run.conf if assets := payload.get("assets"): - # is first key thumbnail + # remove thumbnail asset if provided in collection config if "thumbnail" in assets.keys(): assets.pop("thumbnail") + # if thumbnail was only asset, delete assets if not assets: payload.pop("assets") + # finally put the mutated assets back in the payload + else: + payload["assets"] = assets return payload