Skip to content

Commit

Permalink
Reorder transfer <> discovery, change transfer task input
Browse files Browse the repository at this point in the history
  • Loading branch information
ividito committed Dec 4, 2024
1 parent 82ff819 commit 389aa9a
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 13 deletions.
16 changes: 13 additions & 3 deletions dags/veda_data_pipeline/groups/transfer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule
from airflow.decorators import task

group_kwgs = {"group_id": "Transfer", "tooltip": "Transfer"}

Expand All @@ -27,13 +28,22 @@ def cogify_copy_task(ti):
external_role_arn = airflow_vars_json.get("ASSUME_ROLE_WRITE_ARN")
return cogify_transfer_handler(event_src=config, external_role_arn=external_role_arn)


def transfer_data(ti):
@task
def transfer_data(ti, payload):
"""Transfer data from one S3 bucket to another; s3 copy, no need for docker"""
from veda_data_pipeline.utils.transfer import (
data_transfer_handler,
)
config = ti.dag_run.conf
# use task-provided payload if provided, otherwise fall back on ti values
# payload will generally have the same values expected by discovery, so some renames are needed when combining the dicts
config = {
**payload,
**ti.dag_run.conf,
"origin_bucket": payload.get("bucket", ti.dag_run.conf.get("origin_bucket", "veda-data-store")),
"origin_prefix": payload.get("prefix", ti.dag_run.conf.get("origin_prefix", "s3-prefix/")),
"target_bucket": payload.get("target_bucket", ti.dag_run.conf.get("target_bucket", "veda-data-store")),
"dry_run": payload.get("dry_run", ti.dag_run.conf.get("dry_run", True)),# TODO default false before merge
}
airflow_vars = Variable.get("aws_dags_variables")
airflow_vars_json = json.loads(airflow_vars)
external_role_arn = airflow_vars_json.get("ASSUME_ROLE_WRITE_ARN")
Expand Down
23 changes: 15 additions & 8 deletions dags/veda_data_pipeline/veda_dataset_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ def mutate_payload(**kwargs):

@task(max_active_tis_per_dag=3)
def transfer_assets_to_production_bucket(payload):
# TODO
# TODO do transfer
transfer_data(payload)
# if transfer complete, update discovery payload to reflect new bucket
payload.update({"bucket": "veda-data-store"})
payload.update({"prefix": payload.get("collection")+"/"})
return payload


Expand Down Expand Up @@ -112,18 +116,21 @@ def transfer_assets_to_production_bucket(payload):

collection_grp = collection_task_group()
mutate_payload_task = mutate_payload()
discover = discover_from_s3_task.partial(alt_payload=mutate_payload_task).expand(event=extract_discovery_items())
discover.set_upstream(collection_grp) # do not discover until collection exists

get_files = get_dataset_files_to_process(payload=discover) # untangle mapped data format to get iterable payloads from discover step
extract_from_payload = extract_discovery_items()

# asset transfer to production bucket
transfer_flag = dag.params.get("transfer", False)
if transfer_flag:
transfer_task = transfer_assets_to_production_bucket(payload=get_files)
build_stac = build_stac_task.expand(payload=transfer_task)
transfer_task = transfer_assets_to_production_bucket.expand(payload=extract_from_payload)
discover = discover_from_s3_task.partial(alt_payload=mutate_payload_task).expand(event=transfer_task)
else:
build_stac = build_stac_task.expand(payload=get_files)
discover = discover_from_s3_task.partial(alt_payload=mutate_payload_task).expand(event=extract_from_payload)
discover.set_upstream(collection_grp) # do not discover until collection exists

get_files = get_dataset_files_to_process(payload=discover) # untangle mapped data format to get iterable payloads from discover step


build_stac = build_stac_task.expand(payload=get_files)
build_stac.set_upstream(get_files)
# .output is needed coming from a non-taskflow operator
submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac)
Expand Down
4 changes: 2 additions & 2 deletions sm2a/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ count_down = \
@echo "Spinning up the system please wait..."; \
secs=40; \
while [ $secs -gt 0 ]; do \
printf "%d\033[0K\r" "$secs"; \
printf "%d\033[0K\r" "$$secs"; \
sleep 1; \
((secs--)); \
secs=$$((secs - 1)); \
done;

.PHONY:
Expand Down

0 comments on commit 389aa9a

Please sign in to comment.