Skip to content

Commit

Permalink
feat: implement check for gp execution
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanLaserGit committed Oct 23, 2024
1 parent 9459307 commit bbf2288
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 21 deletions.
2 changes: 1 addition & 1 deletion python_tools/src/python_tools/configure_datastream.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def create_conf_fp(args):
output_file_type = ["netcdf"]
if len(args.s3_bucket) > 0:
if "DAILY" in args.start_date:
args.s3_prefix = re.sub(r"\$DAILY",datetime.now(tz.timezone('US/Eastern')).strftime('%Y%m%d'),args.s3_prefix)
args.s3_prefix = re.sub(r"\DAILY",datetime.now(tz.timezone('US/Eastern')).strftime('%Y%m%d'),args.s3_prefix)
output_path = f"s3://{args.s3_bucket}/{args.s3_prefix}"
elif len(args.docker_mount) > 0:
gpkg_file = [f"{args.docker_mount}/datastream-resources/config/{geo_base}"]
Expand Down
4 changes: 2 additions & 2 deletions research_datastream/terraform/GETTING_STARTED.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ Starting from execution_template_datastream. These options correspond directly t
"subset_id" : "Gages-09106150",
"hydrofabric_version" : "2.1.1",
"s3_bucket" : "ngen_datstream",
"object_prefix" : "datastream_cloud_test"
"s3_prefix" : "datastream_cloud_test"
}
```

Expand All @@ -100,7 +100,7 @@ Starting from execution_template_general_purpose. Make sure to wrap commands in
```

### Edit Run Options
The state machine is capable of confirming a complete execution by checking for the existence output data in the form of an s3 object. Set booleans here. If `s3_bucket` and `object_prefix` are provided in `datastream_command_options`, `ngen-datastream` will create a `ngen-run.tar.gz` file that can be found at `s3://<s3_bucket>/<object_prefix>/ngen-run.tar.gz`
The state machine is capable of confirming a complete execution by checking for the existence output data in the form of an s3 object. Set booleans here. If `s3_bucket` and `s3_prefix` are provided in `datastream_command_options`, `ngen-datastream` will create a `ngen-run.tar.gz` file that can be found at `s3://<s3_bucket>/<s3_prefix>/ngen-run.tar.gz`
```
"run_options":{
"ii_delete_volume" : false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"subset_id" : "Gages-09106150",
"hydrofabric_version" : "2.1.1",
"s3_bucket" : "my-bucket",
"object_prefix" : "test_directory"
"s3_prefix" : "test_directory"
},
"run_options":{
"ii_delete_volume" : true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
],
"run_options":{
"ii_delete_volume" : false,
"ii_check_s3" : true
"ii_check_s3" : false
},
"instance_parameters" :
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"subset_id" : "",
"hydrofabric_version" : "",
"s3_bucket" : "",
"object_prefix" : ""
"s3_prefix" : ""
},
"run_options":{
"ii_delete_volume" : true,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,56 @@
import boto3
import time
import re

client_s3 = boto3.client('s3')

def wait_for_object_existence(bucket_name,object_key):
def wait_for_object_existence(bucket_name,prefix):

ii_obj_found = False
while not ii_obj_found:
try:
client_s3.head_object(Bucket=bucket_name, Key=object_key)
print(f"Key: '{object_key}' found!")
ii_obj_found = True
except:
while True:
response = client_s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
if 'Contents' in response:
print(f"Objects exists in bucket {bucket_name} at prefix {prefix}")
return
else:
time.sleep(1)
if not ii_obj_found: raise Exception(f'{object_key} does not exist in {bucket_name} ')

def lambda_handler(event, context):

bucket = None
prefix = None
if event["run_options"]['ii_check_s3']:
if not 'datastream_command_options' in event: raise Exception(f'The lambda only knows how to check s3 object for datastream_command_options with s3_bucket and object_prefix set')
bucket = event['datastream_command_options']['s3_bucket']
obj_key = event['datastream_command_options']['object_prefix'] + '/ngen-run.tar.gz'
print(f'Checking if {obj_key} exists in {bucket}')
wait_for_object_existence(bucket, obj_key)
if not 'datastream_command_options' in event:
for jcmd in event["commands"]:
bucket_pattern = r"--s3_bucket[=\s']+([^\s']+)"
match = re.search(bucket_pattern, jcmd)
if match:
bucket = match.group(1)
prefix_pattern = r"--s3_prefix[=\s']+([^\s']+)"
match = re.search(prefix_pattern, jcmd)
if match:
prefix = match.group(1)
else:
bucket = event['datastream_command_options']['s3_bucket']
prefix = event['datastream_command_options']['s3_prefix']
if bucket is None or prefix is None:
raise Exception(f'User specified ii_check_s3, but no s3_bucket or s3_prefix were not found in commands')
print(f'Checking if any objects with prefix {prefix} exists in {bucket}')
wait_for_object_existence(bucket, prefix)
else:
print(f'No s3 object check was performed.')

return event

if __name__ == "__main__":

import argparse, json
parser = argparse.ArgumentParser()
parser.add_argument('--execution', dest="execution", type=str, help="",default = None)
args = parser.parse_args()

with open(args.execution,'r') as fp:
execution = json.load(fp)

lambda_handler(execution,"")


Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def lambda_handler(event, context):
event['commands'] = []
if "s3_bucket" in ds_options:
bucket = ds_options["s3_bucket"]
prefix = ds_options["object_prefix"]
prefix = ds_options["s3_prefix"]
nprocs = ds_options["nprocs"]
start = ds_options["start_time"]
end = ds_options["end_time"]
Expand Down

0 comments on commit bbf2288

Please sign in to comment.