From bbf2288e33033c9e41c93e5ae307b30318ffcc3b Mon Sep 17 00:00:00 2001 From: Jordan Laser Date: Wed, 23 Oct 2024 11:48:02 -0600 Subject: [PATCH] feat: implement check for gp execution --- .../src/python_tools/configure_datastream.py | 2 +- .../terraform/GETTING_STARTED.md | 4 +- .../execution_datastream_example.json | 2 +- .../executions/execution_gp_example.json | 2 +- .../execution_template_datastream.json | 2 +- .../checker/lambda_function.py | 54 ++++++++++++++----- .../streamcommander/lambda_function.py | 2 +- 7 files changed, 47 insertions(+), 21 deletions(-) diff --git a/python_tools/src/python_tools/configure_datastream.py b/python_tools/src/python_tools/configure_datastream.py index 37a1d7b71..076e8f4db 100644 --- a/python_tools/src/python_tools/configure_datastream.py +++ b/python_tools/src/python_tools/configure_datastream.py @@ -178,7 +178,7 @@ def create_conf_fp(args): output_file_type = ["netcdf"] if len(args.s3_bucket) > 0: if "DAILY" in args.start_date: - args.s3_prefix = re.sub(r"\$DAILY",datetime.now(tz.timezone('US/Eastern')).strftime('%Y%m%d'),args.s3_prefix) + args.s3_prefix = re.sub(r"\DAILY",datetime.now(tz.timezone('US/Eastern')).strftime('%Y%m%d'),args.s3_prefix) output_path = f"s3://{args.s3_bucket}/{args.s3_prefix}" elif len(args.docker_mount) > 0: gpkg_file = [f"{args.docker_mount}/datastream-resources/config/{geo_base}"] diff --git a/research_datastream/terraform/GETTING_STARTED.md b/research_datastream/terraform/GETTING_STARTED.md index 514cbf597..f72757e72 100644 --- a/research_datastream/terraform/GETTING_STARTED.md +++ b/research_datastream/terraform/GETTING_STARTED.md @@ -87,7 +87,7 @@ Starting from execution_template_datastream. These options correspond directly t "subset_id" : "Gages-09106150", "hydrofabric_version" : "2.1.1", "s3_bucket" : "ngen_datstream", - "object_prefix" : "datastream_cloud_test" + "s3_prefix" : "datastream_cloud_test" } ``` @@ -100,7 +100,7 @@ Starting from execution_template_general_purpose. Make sure to wrap commands in ``` ### Edit Run Options -The state machine is capable of confirming a complete execution by checking for the existence output data in the form of an s3 object. Set booleans here. If `s3_bucket` and `object_prefix` are provided in `datastream_command_options`, `ngen-datastream` will create a `ngen-run.tar.gz` file that can be found at `s3:////ngen-run.tar.gz` +The state machine is capable of confirming a complete execution by checking for the existence output data in the form of an s3 object. Set booleans here. If `s3_bucket` and `s3_prefix` are provided in `datastream_command_options`, `ngen-datastream` will create a `ngen-run.tar.gz` file that can be found at `s3:////ngen-run.tar.gz` ``` "run_options":{ "ii_delete_volume" : false, diff --git a/research_datastream/terraform/executions/execution_datastream_example.json b/research_datastream/terraform/executions/execution_datastream_example.json index 471be3a49..d05683c91 100644 --- a/research_datastream/terraform/executions/execution_datastream_example.json +++ b/research_datastream/terraform/executions/execution_datastream_example.json @@ -9,7 +9,7 @@ "subset_id" : "Gages-09106150", "hydrofabric_version" : "2.1.1", "s3_bucket" : "my-bucket", - "object_prefix" : "test_directory" + "s3_prefix" : "test_directory" }, "run_options":{ "ii_delete_volume" : true, diff --git a/research_datastream/terraform/executions/execution_gp_example.json b/research_datastream/terraform/executions/execution_gp_example.json index 802a5b927..be6850305 100644 --- a/research_datastream/terraform/executions/execution_gp_example.json +++ b/research_datastream/terraform/executions/execution_gp_example.json @@ -4,7 +4,7 @@ ], "run_options":{ "ii_delete_volume" : false, - "ii_check_s3" : true + "ii_check_s3" : false }, "instance_parameters" : { diff --git a/research_datastream/terraform/executions/execution_template_datastream.json b/research_datastream/terraform/executions/execution_template_datastream.json index eae3055d1..5b5cc5e51 100644 --- a/research_datastream/terraform/executions/execution_template_datastream.json +++ b/research_datastream/terraform/executions/execution_template_datastream.json @@ -9,7 +9,7 @@ "subset_id" : "", "hydrofabric_version" : "", "s3_bucket" : "", - "object_prefix" : "" + "s3_prefix" : "" }, "run_options":{ "ii_delete_volume" : true, diff --git a/research_datastream/terraform/lambda_functions/checker/lambda_function.py b/research_datastream/terraform/lambda_functions/checker/lambda_function.py index dbd0b9041..f5e715677 100644 --- a/research_datastream/terraform/lambda_functions/checker/lambda_function.py +++ b/research_datastream/terraform/lambda_functions/checker/lambda_function.py @@ -1,30 +1,56 @@ import boto3 import time +import re client_s3 = boto3.client('s3') -def wait_for_object_existence(bucket_name,object_key): +def wait_for_object_existence(bucket_name,prefix): - ii_obj_found = False - while not ii_obj_found: - try: - client_s3.head_object(Bucket=bucket_name, Key=object_key) - print(f"Key: '{object_key}' found!") - ii_obj_found = True - except: + while True: + response = client_s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) + if 'Contents' in response: + print(f"Objects exists in bucket {bucket_name} at prefix {prefix}") + return + else: time.sleep(1) - if not ii_obj_found: raise Exception(f'{object_key} does not exist in {bucket_name} ') def lambda_handler(event, context): + bucket = None + prefix = None if event["run_options"]['ii_check_s3']: - if not 'datastream_command_options' in event: raise Exception(f'The lambda only knows how to check s3 object for datastream_command_options with s3_bucket and object_prefix set') - bucket = event['datastream_command_options']['s3_bucket'] - obj_key = event['datastream_command_options']['object_prefix'] + '/ngen-run.tar.gz' - print(f'Checking if {obj_key} exists in {bucket}') - wait_for_object_existence(bucket, obj_key) + if not 'datastream_command_options' in event: + for jcmd in event["commands"]: + bucket_pattern = r"--s3_bucket[=\s']+([^\s']+)" + match = re.search(bucket_pattern, jcmd) + if match: + bucket = match.group(1) + prefix_pattern = r"--s3_prefix[=\s']+([^\s']+)" + match = re.search(prefix_pattern, jcmd) + if match: + prefix = match.group(1) + else: + bucket = event['datastream_command_options']['s3_bucket'] + prefix = event['datastream_command_options']['s3_prefix'] + if bucket is None or prefix is None: + raise Exception(f'User specified ii_check_s3, but no s3_bucket or s3_prefix were not found in commands') + print(f'Checking if any objects with prefix {prefix} exists in {bucket}') + wait_for_object_existence(bucket, prefix) else: print(f'No s3 object check was performed.') return event + +if __name__ == "__main__": + + import argparse, json + parser = argparse.ArgumentParser() + parser.add_argument('--execution', dest="execution", type=str, help="",default = None) + args = parser.parse_args() + + with open(args.execution,'r') as fp: + execution = json.load(fp) + + lambda_handler(execution,"") + diff --git a/research_datastream/terraform/lambda_functions/streamcommander/lambda_function.py b/research_datastream/terraform/lambda_functions/streamcommander/lambda_function.py index 28a7696a0..609870a1e 100644 --- a/research_datastream/terraform/lambda_functions/streamcommander/lambda_function.py +++ b/research_datastream/terraform/lambda_functions/streamcommander/lambda_function.py @@ -38,7 +38,7 @@ def lambda_handler(event, context): event['commands'] = [] if "s3_bucket" in ds_options: bucket = ds_options["s3_bucket"] - prefix = ds_options["object_prefix"] + prefix = ds_options["s3_prefix"] nprocs = ds_options["nprocs"] start = ds_options["start_time"] end = ds_options["end_time"]