diff --git a/dags/veda_data_pipeline/groups/transfer_group.py b/dags/veda_data_pipeline/groups/transfer_group.py index 3bf30e1f..f8f09c35 100644 --- a/dags/veda_data_pipeline/groups/transfer_group.py +++ b/dags/veda_data_pipeline/groups/transfer_group.py @@ -5,6 +5,7 @@ import json from airflow.utils.task_group import TaskGroup from airflow.utils.trigger_rule import TriggerRule +from airflow.decorators import task group_kwgs = {"group_id": "Transfer", "tooltip": "Transfer"} @@ -27,13 +28,22 @@ def cogify_copy_task(ti): external_role_arn = airflow_vars_json.get("ASSUME_ROLE_WRITE_ARN") return cogify_transfer_handler(event_src=config, external_role_arn=external_role_arn) - -def transfer_data(ti): +@task +def transfer_data(ti, payload): """Transfer data from one S3 bucket to another; s3 copy, no need for docker""" from veda_data_pipeline.utils.transfer import ( data_transfer_handler, ) - config = ti.dag_run.conf + # use task-provided payload if provided, otherwise fall back on ti values + # payload will generally have the same values expected by discovery, so some renames are needed when combining the dicts + config = { + **payload, + **ti.dag_run.conf, + "origin_bucket": payload.get("bucket", ti.dag_run.conf.get("origin_bucket", "veda-data-store")), + "origin_prefix": payload.get("prefix", ti.dag_run.conf.get("origin_prefix", "s3-prefix/")), + "target_bucket": payload.get("target_bucket", ti.dag_run.conf.get("target_bucket", "veda-data-store")), + "dry_run": payload.get("dry_run", ti.dag_run.conf.get("dry_run", True)),# TODO default false before merge + } airflow_vars = Variable.get("aws_dags_variables") airflow_vars_json = json.loads(airflow_vars) external_role_arn = airflow_vars_json.get("ASSUME_ROLE_WRITE_ARN") diff --git a/dags/veda_data_pipeline/veda_dataset_pipeline.py b/dags/veda_data_pipeline/veda_dataset_pipeline.py index c4a7d418..c6b1f2b6 100644 --- a/dags/veda_data_pipeline/veda_dataset_pipeline.py +++ b/dags/veda_data_pipeline/veda_dataset_pipeline.py @@ -79,7 +79,11 @@ def mutate_payload(**kwargs): @task(max_active_tis_per_dag=3) def transfer_assets_to_production_bucket(payload): - # TODO + # TODO do transfer + transfer_data(payload) + # if transfer complete, update discovery payload to reflect new bucket + payload.update({"bucket": "veda-data-store"}) + payload.update({"prefix": payload.get("collection")+"/"}) return payload @@ -112,18 +116,21 @@ def transfer_assets_to_production_bucket(payload): collection_grp = collection_task_group() mutate_payload_task = mutate_payload() - discover = discover_from_s3_task.partial(alt_payload=mutate_payload_task).expand(event=extract_discovery_items()) - discover.set_upstream(collection_grp) # do not discover until collection exists - - get_files = get_dataset_files_to_process(payload=discover) # untangle mapped data format to get iterable payloads from discover step + extract_from_payload = extract_discovery_items() # asset transfer to production bucket transfer_flag = dag.params.get("transfer", False) if transfer_flag: - transfer_task = transfer_assets_to_production_bucket(payload=get_files) - build_stac = build_stac_task.expand(payload=transfer_task) + transfer_task = transfer_assets_to_production_bucket.expand(payload=extract_from_payload) + discover = discover_from_s3_task.partial(alt_payload=mutate_payload_task).expand(event=transfer_task) else: - build_stac = build_stac_task.expand(payload=get_files) + discover = discover_from_s3_task.partial(alt_payload=mutate_payload_task).expand(event=extract_from_payload) + discover.set_upstream(collection_grp) # do not discover until collection exists + + get_files = get_dataset_files_to_process(payload=discover) # untangle mapped data format to get iterable payloads from discover step + + + build_stac = build_stac_task.expand(payload=get_files) build_stac.set_upstream(get_files) # .output is needed coming from a non-taskflow operator submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac) diff --git a/sm2a/Makefile b/sm2a/Makefile index 616f74d6..6efc19f2 100644 --- a/sm2a/Makefile +++ b/sm2a/Makefile @@ -12,9 +12,9 @@ count_down = \ @echo "Spinning up the system please wait..."; \ secs=40; \ while [ $secs -gt 0 ]; do \ - printf "%d\033[0K\r" "$secs"; \ + printf "%d\033[0K\r" "$$secs"; \ sleep 1; \ - ((secs--)); \ + secs=$$((secs - 1)); \ done; .PHONY: