Skip to content

Commit

Permalink
datastream lambdas
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanLaserGit committed Feb 5, 2024
1 parent 668d235 commit 34f51fe
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 9 deletions.
3 changes: 1 addition & 2 deletions cloud/AWS/checker/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ def wait_for_object_existence(bucket_name,object_key):
def lambda_handler(event, context):

bucket = event['bucket']
prefix = event['prefix']
obj_key = event['obj_key']
wait_for_object_existence(bucket, prefix + obj_key)
wait_for_object_existence(bucket, obj_key)

print(f'{obj_key} exists! Success!')
return event
Expand Down
35 changes: 28 additions & 7 deletions cloud/AWS/execution_dailyrun.json
Original file line number Diff line number Diff line change
@@ -1,18 +1,39 @@
{
"instance_parameters" :
{
"ImageId" : "ami-08cba41c585e4a2e2",
"InstanceType" : "c5n.2xlarge",
"ImageId" : "ami-02cf2f3fa2111b506",
"InstanceType" : "c5n.18xlarge",
"KeyName" : "Processor",
"MinCount" : 1,
"MaxCount" : 1,
"SecurityGroupIds" : ["sg-0fc864d44ef677a07"],
"SecurityGroupIds" : ["sg-066e56297c706ac84"],
"IamInstanceProfile" : {
"Name" : "Processor"
}
},
"TagSpecifications" :[
{
"ResourceType": "instance",
"Tags": [
{
"Key" : "Name",
"Value" : "ngendatastream_DATE"
}
]
}
],
"BlockDeviceMappings":[
{
"DeviceName": "/dev/xvda",
"Ebs": {
"VolumeSize": 128,
"VolumeType": "gp2"
}
}
]
},

"config_bucket" : "ngenresourcesdev",
"run_type" : "dailyrun",
"obj_key" : "/forcings/forcings.tar.gz"
"region" : "us-east-2",
"commands" : ["/home/ec2-user/ngen-datastream/scripts/stream.sh -c /home/ec2-user/ngen-datastream/configs/conf_datastream_daily.sh"],
"bucket" : "ngen-datastream",
"obj_key" : "daily/DATE/ngen-run/outputs/cat-1.csv"
}
46 changes: 46 additions & 0 deletions cloud/AWS/start_ami/lambda_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import boto3
import time
from datetime import datetime

def wait_for_instance_status(instance_id, status, max_retries=120):
retries = 0
while retries < max_retries:
instance_info = client_ssm.describe_instance_information(
InstanceInformationFilterList=[
{
'key': 'InstanceIds',
'valueSet': [instance_id],
},
]
)
if instance_info['InstanceInformationList'] and instance_info['InstanceInformationList'][0]['PingStatus'] == status:
return True
time.sleep(1)
retries += 1
return False

def lambda_handler(event, context):

global client_ec2, client_ssm
client_ec2 = boto3.client('ec2',event['region'])
client_ssm = boto3.client('ssm',event['region'])

params = event['instance_parameters']

date = datetime.now()
date_fmt = date.strftime('%Y%m%d')
if 'DATE' in params['TagSpecifications'][0]['Tags'][0]['Value']:
key_str = params['TagSpecifications'][0]['Tags'][0]['Value']
params['TagSpecifications'][0]['Tags'][0]['Value'] = key_str.replace('DATE',date_fmt)

response = client_ec2.run_instances(**params)
instance_id = response['Instances'][0]['InstanceId']

client_ec2.start_instances(InstanceIds=[instance_id])
if not wait_for_instance_status(instance_id, 'Online'):
raise Exception(f"EC2 instance {instance_id} did not reach 'Online' state")
print(f'{instance_id} has been launched and running')

event['instance_parameters']['InstanceId'] = instance_id

return event
52 changes: 52 additions & 0 deletions cloud/AWS/streamcommander/lambda_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import boto3
import time
from datetime import datetime

client_s3 = boto3.client('s3')
client_ssm = boto3.client('ssm')

def wait_for_command_response(response,instance_id):
iters = 0
max_iters = 10
while True:
try:
command_id = response['Command']['CommandId']
output = client_ssm.get_command_invocation(
CommandId=command_id,
InstanceId=instance_id,
)
print(f'Response obtained -> {output}')
break
except:
print(f'waiting for command response...')
time.sleep(1)
iters += 1
if iters > max_iters:
print(f'FAILED')
break

def lambda_handler(event, context):
"""
Handler function to issue commands to an ec2
"""

instance_id = event['instance_parameters']['InstanceId']

response = client_ssm.send_command(
InstanceIds=[instance_id],
DocumentName='AWS-RunShellScript',
Parameters={'commands': event['commands']}
)
wait_for_command_response(response,instance_id)
print(f'{instance_id} is launched and processing forcings')

event['command_id'] = response['Command']['CommandId']

date = datetime.now()
date_fmt = date.strftime('%Y%m%d')
if 'DATE' in event['obj_key']:
key_str = event['obj_key']
event['obj_key'] = key_str.replace('DATE',date_fmt)

return event

0 comments on commit 34f51fe

Please sign in to comment.