diff --git a/dags/veda_data_pipeline/utils/collection_generation.py b/dags/veda_data_pipeline/utils/collection_generation.py index 02b3a659..e04e1356 100644 --- a/dags/veda_data_pipeline/utils/collection_generation.py +++ b/dags/veda_data_pipeline/utils/collection_generation.py @@ -27,6 +27,7 @@ class GenerateCollection: "is_periodic", "time_density", "type", + "transfer" ] def get_template(self, dataset: Dict[str, Any]) -> dict: diff --git a/dags/veda_data_pipeline/veda_dataset_pipeline.py b/dags/veda_data_pipeline/veda_dataset_pipeline.py index c7356976..c4a7d418 100644 --- a/dags/veda_data_pipeline/veda_dataset_pipeline.py +++ b/dags/veda_data_pipeline/veda_dataset_pipeline.py @@ -77,6 +77,11 @@ def mutate_payload(**kwargs): payload["assets"] = assets return payload +@task(max_active_tis_per_dag=3) +def transfer_assets_to_production_bucket(payload): + # TODO + return payload + template_dag_run_conf = { "collection": "", @@ -95,7 +100,8 @@ def mutate_payload(**kwargs): "is_periodic": "", "license": "", "time_density": "", - "title": "" + "title": "", + "transfer": " # transfer assets to production bucket if true (false by default)", } with DAG("veda_dataset_pipeline", params=template_dag_run_conf, **dag_args) as dag: @@ -108,9 +114,17 @@ def mutate_payload(**kwargs): mutate_payload_task = mutate_payload() discover = discover_from_s3_task.partial(alt_payload=mutate_payload_task).expand(event=extract_discovery_items()) discover.set_upstream(collection_grp) # do not discover until collection exists - get_files = get_dataset_files_to_process(payload=discover) - build_stac = build_stac_task.expand(payload=get_files) + get_files = get_dataset_files_to_process(payload=discover) # untangle mapped data format to get iterable payloads from discover step + + # asset transfer to production bucket + transfer_flag = dag.params.get("transfer", False) + if transfer_flag: + transfer_task = transfer_assets_to_production_bucket(payload=get_files) + build_stac = build_stac_task.expand(payload=transfer_task) + else: + build_stac = build_stac_task.expand(payload=get_files) + build_stac.set_upstream(get_files) # .output is needed coming from a non-taskflow operator submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac) diff --git a/sm2a/Makefile b/sm2a/Makefile index 57a4e020..2366a737 100644 --- a/sm2a/Makefile +++ b/sm2a/Makefile @@ -10,11 +10,11 @@ info_message = \ count_down = \ @echo "Spinning up the system please wait..."; \ - secs=40 ;\ - while [ $$secs -gt 0 ]; do \ - printf "%d\033[0K\r" $$secs; \ + secs=40; \ + while [ $secs -gt 0 ]; do \ + printf "%d\033[0K\r" $secs; \ sleep 1; \ - : $$((secs--)); \ + ((secs--)); \ done; .PHONY: