From 5e10dc05289bcf4eb2e6cd5ba546558fa48cc729 Mon Sep 17 00:00:00 2001 From: Jordan Laser Date: Mon, 5 Feb 2024 12:34:50 -0700 Subject: [PATCH] datastream lambdas --- cloud/AWS/execution_dailyrun.json | 4 +- cloud/AWS/forcingcommander/lambda_function.py | 104 ------------------ cloud/AWS/starter/lambda_function.py | 76 ------------- cloud/AWS/startup_ec2.sh | 4 +- cloud/AWS/stopper/lambda_function.py | 19 ---- cloud/AWS/tarballer/lambda_function.py | 55 --------- cloud/AWS/validator/lambda_function.py | 42 ------- 7 files changed, 5 insertions(+), 299 deletions(-) delete mode 100644 cloud/AWS/forcingcommander/lambda_function.py delete mode 100644 cloud/AWS/starter/lambda_function.py delete mode 100644 cloud/AWS/stopper/lambda_function.py delete mode 100644 cloud/AWS/tarballer/lambda_function.py delete mode 100644 cloud/AWS/validator/lambda_function.py diff --git a/cloud/AWS/execution_dailyrun.json b/cloud/AWS/execution_dailyrun.json index a508850d..b3732b38 100644 --- a/cloud/AWS/execution_dailyrun.json +++ b/cloud/AWS/execution_dailyrun.json @@ -1,7 +1,7 @@ { "instance_parameters" : { - "ImageId" : "ami-02cf2f3fa2111b506", + "ImageId" : "ami-04d28890d40332840", "InstanceType" : "c5n.18xlarge", "KeyName" : "Processor", "MinCount" : 1, @@ -33,7 +33,7 @@ }, "region" : "us-east-2", - "commands" : ["/home/ec2-user/ngen-datastream/scripts/stream.sh -c /home/ec2-user/ngen-datastream/configs/conf_datastream_daily.sh"], + "commands" : ["mount-s3 ngen-datastream /home/ec2-user/ngen-datastream/data/mount", "/home/ec2-user/ngen-datastream/scripts/stream.sh -c /home/ec2-user/ngen-datastream/configs/conf_datastream_daily.sh"], "bucket" : "ngen-datastream", "obj_key" : "daily/DATE/ngen-run/outputs/cat-1.csv" } diff --git a/cloud/AWS/forcingcommander/lambda_function.py b/cloud/AWS/forcingcommander/lambda_function.py deleted file mode 100644 index 6861f1a3..00000000 --- a/cloud/AWS/forcingcommander/lambda_function.py +++ /dev/null @@ -1,104 +0,0 @@ -import boto3 -import json -import time -import datetime - -client_s3 = boto3.client('s3') -client_ssm = boto3.client('ssm') - -def wait_for_command_response(response,instance_id): - iters = 0 - max_iters = 10 - while True: - try: - command_id = response['Command']['CommandId'] - output = client_ssm.get_command_invocation( - CommandId=command_id, - InstanceId=instance_id, - ) - print(f'Response obtained -> {output}') - break - except: - print(f'waiting for command response...') - time.sleep(1) - iters += 1 - if iters > max_iters: - print(f'FAILED') - break - -def get_conf(bucket_path,out_path): - conf_path = '/tmp/conf.json' - client_s3.download_file(bucket_path, out_path, conf_path) - with open(conf_path,'r') as fp: - data = json.load(fp) - return data - -def fix_time(data): - date = datetime.datetime.now() - date = date.strftime('%Y%m%d') - - hourminute = '0000' - - data['start_date'] = date + hourminute - data['end_date'] = date + hourminute - - return data, date - -def update_confs(bucket, run_type): - - conf_data = get_conf(bucket,f'conf_{run_type}_template.json') - conf_nwm_data = get_conf(bucket,f'conf_{run_type}_template_nwmfilenames.json') - - conf_data['forcing'], date = fix_time(conf_data['forcing']) - conf_nwm_data, _ = fix_time(conf_nwm_data) - - prefix = f"{run_type}/{date}" - conf_data['storage']['output_bucket_path'] = prefix - - conf_run = f'{run_type}.json' - with open('/tmp/' + conf_run,'w') as fp: - json.dump(conf_data,fp) - client_s3.upload_file('/tmp/' + conf_run, bucket, conf_run) - - conf_run = f'{run_type}_nwmfilenames.json' - with open('/tmp/' + conf_run,'w') as fp: - json.dump(conf_nwm_data,fp) - client_s3.upload_file('/tmp/' + conf_run, bucket, conf_run) - - print(f'The {run_type} config has been updated to date: {date}\nand output_bucket_path to {prefix}') - - return conf_data['storage']['output_bucket'], prefix - -def lambda_handler(event, context): - """ - Handler function to kick off the NWM 2 NGEN forcingprocessor - - """ - - instance_id = event['instance_parameters']['InstanceId'] - run_type = event['run_type'] - bucket = event['config_bucket'] - - command = \ - f"source /home/ec2-user/venv-datastream/bin/activate && " + \ - f"python /home/ec2-user/ngen-datastream/forcingprocessor/src/nwm_filenames_generator.py https://{bucket}.s3.us-east-2.amazonaws.com/conf_{run_type}_template_nwmfilenames.json && " + \ - f"python /home/ec2-user/ngen-datastream/forcingprocessor/src/forcingprocessor.py https://{bucket}.s3.us-east-2.amazonaws.com/conf_{run_type}_template.json" - - output_bucket, prefix = update_confs( - bucket, - run_type - ) - - response = client_ssm.send_command( - InstanceIds=[instance_id], - DocumentName='AWS-RunShellScript', - Parameters={'commands': [command]} - ) - wait_for_command_response(response,instance_id) - print(f'{instance_id} is launched and processing forcings') - - event['command_id'] = response['Command']['CommandId'] - event['bucket'] = output_bucket - event['prefix'] = prefix - - return event diff --git a/cloud/AWS/starter/lambda_function.py b/cloud/AWS/starter/lambda_function.py deleted file mode 100644 index 2f377d37..00000000 --- a/cloud/AWS/starter/lambda_function.py +++ /dev/null @@ -1,76 +0,0 @@ -import boto3 -import time - -client_ec2 = boto3.client('ec2') -client_ssm = boto3.client('ssm') - -def wait_for_instance_status(instance_id, status, max_retries=120): - retries = 0 - while retries < max_retries: - instance_info = client_ssm.describe_instance_information( - InstanceInformationFilterList=[ - { - 'key': 'InstanceIds', - 'valueSet': [instance_id], - }, - ] - ) - if instance_info['InstanceInformationList'] and instance_info['InstanceInformationList'][0]['PingStatus'] == status: - return True - time.sleep(1) - retries += 1 - return False - -def lambda_handler(event, context): - - # with open('./userdata.sh', 'r') as fp: - # userdata = fp.read() - - user_data = '''#!/bin/bash - -# cli command to start instance -# aws ec2 run-instances --user-data file://user_data.txt --instance-type c5n.18xlarge --count 1 --image-id ami-08cba41c585e4a2e2 --region us-east-2 --key-name Processor --iam-instance-profile '{"Name":"Processor"}' --security-group-ids "sg-0fc864d44ef677a07" --profile jlaser_ciroh - -echo "EXECUTING USER DATA" - -cd /home/ec2-user - -sudo dnf install git -y -python3 -m venv ./venv-datastream -git clone https://github.com/CIROH-UA/ngen-datastream.git -source ./venv-datastream/bin/activate && pip3 install --upgrade pip -pip3 install -r ./ngen-datastream/requirements.txt -deactivate - -python3 -m venv ./venv-ngen-cal -git clone --branch run_folder_validation https://github.com/JordanLaserGit/ngen-cal.git -source ./venv-ngen-cal/bin/activate && pip3 install --upgrade pip -pip3 install -r ./ngen-cal/requirements.txt -pip3 install -e ./ngen-cal/python/ngen_conf -deactivate - -# sudo dnf install go -y -# go install github.com/aaraney/ht@latest - -touch /tmp/userdata_complete - -echo "USERDATA COMPLETE" - - ''' - - params = event['instance_parameters'] - params['UserData'] = user_data - - response = client_ec2.run_instances(**params) - instance_id = response['Instances'][0]['InstanceId'] - - client_ec2.start_instances(InstanceIds=[instance_id]) - if not wait_for_instance_status(instance_id, 'Online'): - raise Exception(f"EC2 instance {instance_id} did not reach 'Online' state") - print(f'{instance_id} has been launched and running') - - event['instance_parameters']['InstanceId'] = instance_id - - time.sleep(120) - - return event diff --git a/cloud/AWS/startup_ec2.sh b/cloud/AWS/startup_ec2.sh index ff1da992..7620b5ac 100644 --- a/cloud/AWS/startup_ec2.sh +++ b/cloud/AWS/startup_ec2.sh @@ -6,7 +6,7 @@ tar -xzvf hfsubset-linux_amd64.tar.gz sudo mv hfsubset /usr/bin/hfsubset git clone https://github.com/CIROH-UA/ngen-datastream.git python3 -m pip install --upgrade pip -pip3 install -r $PACAKGE_DIR/requirements.txt --no-cache +pip3 install -r /home/ec2-user/ngen-datastream/python/requirements.txt --no-cache aws configure aws configure set s3.max_concurrent_requests 256 mkdir docker @@ -16,6 +16,8 @@ sudo dnf -y install dnf-plugins-core sudo dnf install docker -y sudo systemctl start docker sudo usermod -aG docker ${USER} +curl -L -O https://s3.amazonaws.com/mountpoint-s3-release/latest/x86_64/mount-s3.rpm +sudo yum install -y ./mount-s3.rpm echo "cd docker && sudo docker build -t awiciroh/ngen-deps:latest -f Dockerfile.ngen-deps --no-cache . && docker build -t awiciroh/t-route:latest -f Dockerfile.t-route . --no-cache && docker build -t awiciroh/ngen -f Dockerfile.ngen . --no-cache && docker build -t awiciroh/ciroh-ngen-image:latest-local -f Dockerfile . --no-cache" echo "copy that ^^ and log out of session, log back in and run that command" diff --git a/cloud/AWS/stopper/lambda_function.py b/cloud/AWS/stopper/lambda_function.py deleted file mode 100644 index 2d76c84e..00000000 --- a/cloud/AWS/stopper/lambda_function.py +++ /dev/null @@ -1,19 +0,0 @@ -import boto3 - -client_ec2 = boto3.client('ec2') - -def lambda_handler(event, context): - """ - Generic Poller funcion - """ - - instance_id = event['instance_parameters']['InstanceId'] - - print(f'Shutting down processor {instance_id}') - client_ec2.stop_instances(InstanceIds=[instance_id]) - - waiter = client_ec2.get_waiter('instance_stopped') - waiter.wait(InstanceIds=[instance_id]) - - print(f'Instance {instance_id} has been successfully stopped.') - diff --git a/cloud/AWS/tarballer/lambda_function.py b/cloud/AWS/tarballer/lambda_function.py deleted file mode 100644 index 61b0f264..00000000 --- a/cloud/AWS/tarballer/lambda_function.py +++ /dev/null @@ -1,55 +0,0 @@ -import boto3 -import tarfile -import os -import datetime - -client_s3 = boto3.client('s3') - -def lambda_handler(event, context): - - tar1_bucket = event['bucket'] - tar1_prefix = event['prefix'] - tar1_key = event['obj_key'] - - tar2_bucket = event['config_bucket'] - run_type = event['run_type'] - - tar1_filename = tar1_key.split('/')[-1] - tar1_tar_path = f'/tmp/{tar1_filename}' - client_s3.download_file(tar1_bucket, tar1_prefix + tar1_key, tar1_tar_path) - - tar2_key = f"{run_type}.tar.gz" - tar2_tar_path = f'/tmp/{tar2_key}' - client_s3.download_file(tar2_bucket, tar2_key, tar2_tar_path) - - date = datetime.datetime.now() - date = date.strftime('%Y%m%d') - new_tar_name = f'{run_type}_{date}.tar.gz' - new_tar = f'/tmp/{new_tar_name}' - new_tar_key = tar1_prefix + '/' + new_tar_name - - os.system(f'touch {new_tar}') - - with tarfile.open(new_tar,'w:gz') as nw_tar: - - with tarfile.open(tar2_tar_path,'r:gz') as tar2: - confs = [ - tarinfo for tarinfo in tar2.getmembers() - ] - - with tarfile.open(tar1_tar_path,'r:gz') as tar1_tar: - forcings = tar1_tar.getmembers() - all_files = confs + forcings - - for jfile in all_files: - name = jfile.name - if name.find('.csv') >= 0: - obj = tar1_tar.extractfile(name) - nw_tar.addfile(jfile,obj) - else: - nw_tar.addfile(jfile,tar2.extractfile(name)) - - client_s3.upload_file(new_tar, tar1_bucket, new_tar_key) - - event["complete_tarball_key"] = new_tar_key - return event \ No newline at end of file diff --git a/cloud/AWS/validator/lambda_function.py b/cloud/AWS/validator/lambda_function.py deleted file mode 100644 index 10065e6d..00000000 --- a/cloud/AWS/validator/lambda_function.py +++ /dev/null @@ -1,42 +0,0 @@ -import boto3 -import time - -client_ssm = boto3.client('ssm') - -def wait_for_command_response(response,instance_id): - iters = 0 - max_iters = 10 - while True: - try: - command_id = response['Command']['CommandId'] - output = client_ssm.get_command_invocation( - CommandId=command_id, - InstanceId=instance_id, - ) - print(f'Response obtained -> {output}') - break - except: - print(f'waiting for command response...') - time.sleep(1) - iters += 1 - if iters > max_iters: - print(f'FAILED') - break - -def lambda_handler(event, context): - - instance_id = event['instance_parameters']['InstanceId'] - ngen_input_bucket = event['bucket'] - ngen_input_key = event['complete_tarball_key'] - - command = f'source /home/ec2-user/ngen-cal-venv/bin/activate && python /home/ec2-user/ngen-cal/python/conf_validation.py --tarball' - response = client_ssm.send_command( - InstanceIds=[instance_id], - DocumentName='AWS-RunShellScript', - Parameters={'commands': [command]} - ) - wait_for_command_response(response,instance_id) - - event['command_id'] = response['Command']['CommandId'] - - return event \ No newline at end of file