From 34f51fec9f816a8898a090dcdab5308d963ebfb0 Mon Sep 17 00:00:00 2001 From: Jordan Laser Date: Mon, 5 Feb 2024 11:54:37 -0700 Subject: [PATCH] datastream lambdas --- cloud/AWS/checker/lambda_function.py | 3 +- cloud/AWS/execution_dailyrun.json | 35 +++++++++++--- cloud/AWS/start_ami/lambda_handler.py | 46 ++++++++++++++++++ cloud/AWS/streamcommander/lambda_handler.py | 52 +++++++++++++++++++++ 4 files changed, 127 insertions(+), 9 deletions(-) create mode 100644 cloud/AWS/start_ami/lambda_handler.py create mode 100644 cloud/AWS/streamcommander/lambda_handler.py diff --git a/cloud/AWS/checker/lambda_function.py b/cloud/AWS/checker/lambda_function.py index eb05eb86..f734849a 100644 --- a/cloud/AWS/checker/lambda_function.py +++ b/cloud/AWS/checker/lambda_function.py @@ -22,9 +22,8 @@ def wait_for_object_existence(bucket_name,object_key): def lambda_handler(event, context): bucket = event['bucket'] - prefix = event['prefix'] obj_key = event['obj_key'] - wait_for_object_existence(bucket, prefix + obj_key) + wait_for_object_existence(bucket, obj_key) print(f'{obj_key} exists! Success!') return event diff --git a/cloud/AWS/execution_dailyrun.json b/cloud/AWS/execution_dailyrun.json index 2d8c0a4a..a508850d 100644 --- a/cloud/AWS/execution_dailyrun.json +++ b/cloud/AWS/execution_dailyrun.json @@ -1,18 +1,39 @@ { "instance_parameters" : { - "ImageId" : "ami-08cba41c585e4a2e2", - "InstanceType" : "c5n.2xlarge", + "ImageId" : "ami-02cf2f3fa2111b506", + "InstanceType" : "c5n.18xlarge", "KeyName" : "Processor", "MinCount" : 1, "MaxCount" : 1, - "SecurityGroupIds" : ["sg-0fc864d44ef677a07"], + "SecurityGroupIds" : ["sg-066e56297c706ac84"], "IamInstanceProfile" : { "Name" : "Processor" - } + }, + "TagSpecifications" :[ + { + "ResourceType": "instance", + "Tags": [ + { + "Key" : "Name", + "Value" : "ngendatastream_DATE" + } + ] + } + ], + "BlockDeviceMappings":[ + { + "DeviceName": "/dev/xvda", + "Ebs": { + "VolumeSize": 128, + "VolumeType": "gp2" + } + } + ] }, - "config_bucket" : "ngenresourcesdev", - "run_type" : "dailyrun", - "obj_key" : "/forcings/forcings.tar.gz" + "region" : "us-east-2", + "commands" : ["/home/ec2-user/ngen-datastream/scripts/stream.sh -c /home/ec2-user/ngen-datastream/configs/conf_datastream_daily.sh"], + "bucket" : "ngen-datastream", + "obj_key" : "daily/DATE/ngen-run/outputs/cat-1.csv" } diff --git a/cloud/AWS/start_ami/lambda_handler.py b/cloud/AWS/start_ami/lambda_handler.py new file mode 100644 index 00000000..9d1b9307 --- /dev/null +++ b/cloud/AWS/start_ami/lambda_handler.py @@ -0,0 +1,46 @@ +import boto3 +import time +from datetime import datetime + +def wait_for_instance_status(instance_id, status, max_retries=120): + retries = 0 + while retries < max_retries: + instance_info = client_ssm.describe_instance_information( + InstanceInformationFilterList=[ + { + 'key': 'InstanceIds', + 'valueSet': [instance_id], + }, + ] + ) + if instance_info['InstanceInformationList'] and instance_info['InstanceInformationList'][0]['PingStatus'] == status: + return True + time.sleep(1) + retries += 1 + return False + +def lambda_handler(event, context): + + global client_ec2, client_ssm + client_ec2 = boto3.client('ec2',event['region']) + client_ssm = boto3.client('ssm',event['region']) + + params = event['instance_parameters'] + + date = datetime.now() + date_fmt = date.strftime('%Y%m%d') + if 'DATE' in params['TagSpecifications'][0]['Tags'][0]['Value']: + key_str = params['TagSpecifications'][0]['Tags'][0]['Value'] + params['TagSpecifications'][0]['Tags'][0]['Value'] = key_str.replace('DATE',date_fmt) + + response = client_ec2.run_instances(**params) + instance_id = response['Instances'][0]['InstanceId'] + + client_ec2.start_instances(InstanceIds=[instance_id]) + if not wait_for_instance_status(instance_id, 'Online'): + raise Exception(f"EC2 instance {instance_id} did not reach 'Online' state") + print(f'{instance_id} has been launched and running') + + event['instance_parameters']['InstanceId'] = instance_id + + return event diff --git a/cloud/AWS/streamcommander/lambda_handler.py b/cloud/AWS/streamcommander/lambda_handler.py new file mode 100644 index 00000000..11622913 --- /dev/null +++ b/cloud/AWS/streamcommander/lambda_handler.py @@ -0,0 +1,52 @@ +import boto3 +import time +from datetime import datetime + +client_s3 = boto3.client('s3') +client_ssm = boto3.client('ssm') + +def wait_for_command_response(response,instance_id): + iters = 0 + max_iters = 10 + while True: + try: + command_id = response['Command']['CommandId'] + output = client_ssm.get_command_invocation( + CommandId=command_id, + InstanceId=instance_id, + ) + print(f'Response obtained -> {output}') + break + except: + print(f'waiting for command response...') + time.sleep(1) + iters += 1 + if iters > max_iters: + print(f'FAILED') + break + +def lambda_handler(event, context): + """ + Handler function to issue commands to an ec2 + + """ + + instance_id = event['instance_parameters']['InstanceId'] + + response = client_ssm.send_command( + InstanceIds=[instance_id], + DocumentName='AWS-RunShellScript', + Parameters={'commands': event['commands']} + ) + wait_for_command_response(response,instance_id) + print(f'{instance_id} is launched and processing forcings') + + event['command_id'] = response['Command']['CommandId'] + + date = datetime.now() + date_fmt = date.strftime('%Y%m%d') + if 'DATE' in event['obj_key']: + key_str = event['obj_key'] + event['obj_key'] = key_str.replace('DATE',date_fmt) + + return event