Skip to content

Commit

Permalink
Add placeholder transfer to dataset DAG (no-op for now, just testing …
Browse files Browse the repository at this point in the history
…control flow)

- also modified sm2a makefile to run on ubuntu
  • Loading branch information
ividito committed Dec 4, 2024
1 parent 0e5cc7c commit 44bcb43
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
1 change: 1 addition & 0 deletions dags/veda_data_pipeline/utils/collection_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class GenerateCollection:
"is_periodic",
"time_density",
"type",
"transfer"
]

def get_template(self, dataset: Dict[str, Any]) -> dict:
Expand Down
20 changes: 17 additions & 3 deletions dags/veda_data_pipeline/veda_dataset_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ def mutate_payload(**kwargs):
payload["assets"] = assets
return payload

@task(max_active_tis_per_dag=3)
def transfer_assets_to_production_bucket(payload):
# TODO
return payload


template_dag_run_conf = {
"collection": "<collection-id>",
Expand All @@ -95,7 +100,8 @@ def mutate_payload(**kwargs):
"is_periodic": "<true|false>",
"license": "<collection-LICENSE>",
"time_density": "<time-density>",
"title": "<collection-title>"
"title": "<collection-title>",
"transfer": "<true|false> # transfer assets to production bucket if true (false by default)",
}

with DAG("veda_dataset_pipeline", params=template_dag_run_conf, **dag_args) as dag:
Expand All @@ -108,9 +114,17 @@ def mutate_payload(**kwargs):
mutate_payload_task = mutate_payload()
discover = discover_from_s3_task.partial(alt_payload=mutate_payload_task).expand(event=extract_discovery_items())
discover.set_upstream(collection_grp) # do not discover until collection exists
get_files = get_dataset_files_to_process(payload=discover)

build_stac = build_stac_task.expand(payload=get_files)
get_files = get_dataset_files_to_process(payload=discover) # untangle mapped data format to get iterable payloads from discover step

# asset transfer to production bucket
transfer_flag = dag.params.get("transfer", False)
if transfer_flag:
transfer_task = transfer_assets_to_production_bucket(payload=get_files)
build_stac = build_stac_task.expand(payload=transfer_task)
else:
build_stac = build_stac_task.expand(payload=get_files)
build_stac.set_upstream(get_files)
# .output is needed coming from a non-taskflow operator
submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac)

Expand Down
8 changes: 4 additions & 4 deletions sm2a/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ info_message = \

count_down = \
@echo "Spinning up the system please wait..."; \
secs=40 ;\
while [ $$secs -gt 0 ]; do \
printf "%d\033[0K\r" $$secs; \
secs=40; \
while [ $secs -gt 0 ]; do \
printf "%d\033[0K\r" $secs; \
sleep 1; \
: $$((secs--)); \
((secs--)); \
done;

.PHONY:
Expand Down

0 comments on commit 44bcb43

Please sign in to comment.