diff --git a/.github/workflows/deploy-multi-burst-sandbox.yml b/.github/workflows/deploy-multi-burst-sandbox.yml new file mode 100644 index 000000000..098245c2a --- /dev/null +++ b/.github/workflows/deploy-multi-burst-sandbox.yml @@ -0,0 +1,85 @@ +name: Deploy Multi-Burst Sandbox Stack to AWS + +on: + push: + branches: + - multi-burst-sandbox + +concurrency: ${{ github.workflow }}-${{ github.ref }} + +jobs: + deploy: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + - environment: hyp3-multi-burst-sandbox + domain: hyp3-multi-burst-sandbox.asf.alaska.edu + template_bucket: cf-templates-1hz9ldhhl4ahu-us-west-2 + image_tag: test + product_lifetime_in_days: 14 + default_credits_per_user: 0 + default_application_status: APPROVED + cost_profile: DEFAULT + deploy_ref: refs/heads/multi-burst-sandbox + job_files: >- + job_spec/INSAR_ISCE_BURST.yml + job_spec/INSAR_ISCE_MULTI_BURST.yml + job_spec/AUTORIFT.yml + job_spec/RTC_GAMMA.yml + job_spec/WATER_MAP.yml + job_spec/WATER_MAP_EQ.yml + instance_types: r6id.xlarge,r6id.2xlarge,r6id.4xlarge,r6id.8xlarge,r6idn.xlarge,r6idn.2xlarge,r6idn.4xlarge,r6idn.8xlarge + default_max_vcpus: 640 + expanded_max_vcpus: 640 + required_surplus: 0 + security_environment: ASF + ami_id: /aws/service/ecs/optimized-ami/amazon-linux-2023/recommended/image_id + distribution_url: '' + + environment: + name: ${{ matrix.environment }} + url: https://${{ matrix.domain }} + + steps: + - uses: actions/checkout@v4.1.7 + + - uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: ${{ secrets.V2_AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.V2_AWS_SECRET_ACCESS_KEY }} + aws-session-token: ${{ secrets.V2_AWS_SESSION_TOKEN }} + aws-region: ${{ secrets.AWS_REGION }} + + - uses: actions/setup-python@v5 + with: + python-version: 3.9 + + - uses: ./.github/actions/deploy-hyp3 + with: + TEMPLATE_BUCKET: ${{ matrix.template_bucket }} + STACK_NAME: ${{ matrix.environment }} + DOMAIN_NAME: ${{ matrix.domain }} + API_NAME: ${{ matrix.environment }} + CERTIFICATE_ARN: ${{ secrets.CERTIFICATE_ARN }} + IMAGE_TAG: ${{ matrix.image_tag }} + PRODUCT_LIFETIME: ${{ matrix.product_lifetime_in_days }} + VPC_ID: ${{ secrets.VPC_ID }} + SUBNET_IDS: ${{ secrets.SUBNET_IDS }} + SECRET_ARN: ${{ secrets.SECRET_ARN }} + CLOUDFORMATION_ROLE_ARN: ${{ secrets.CLOUDFORMATION_ROLE_ARN }} + DEFAULT_CREDITS_PER_USER: ${{ matrix.default_credits_per_user }} + DEFAULT_APPLICATION_STATUS: ${{ matrix.default_application_status }} + COST_PROFILE: ${{ matrix.cost_profile }} + JOB_FILES: ${{ matrix.job_files }} + DEFAULT_MAX_VCPUS: ${{ matrix.default_max_vcpus }} + EXPANDED_MAX_VCPUS: ${{ matrix.expanded_max_vcpus }} + MONTHLY_BUDGET: ${{ secrets.MONTHLY_BUDGET }} + REQUIRED_SURPLUS: ${{ matrix.required_surplus }} + ORIGIN_ACCESS_IDENTITY_ID: ${{ secrets.ORIGIN_ACCESS_IDENTITY_ID }} + SECURITY_ENVIRONMENT: ${{ matrix.security_environment }} + AMI_ID: ${{ matrix.ami_id }} + INSTANCE_TYPES: ${{ matrix.instance_types }} + DISTRIBUTION_URL: ${{ matrix.distribution_url }} + AUTH_PUBLIC_KEY: ${{ secrets.AUTH_PUBLIC_KEY }} diff --git a/CHANGELOG.md b/CHANGELOG.md index bc4245065..7ebf1b3ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [7.10.0] + +### Added +- Added a new `INSAR_ISCE_MULTI_BURST` job type for running multi-burst InSAR. Currently, this job type is restricted to a special `hyp3-multi-burst-sandbox` deployment for HyP3 operators. However, this is an important step toward eventually making multi-burst InSAR available for general users. + +### Changed +- Job validator functions now accept two parameters: the job dictionary and the granule metadata. +- Granule metadata validation now supports `reference` and `secondary` job parameters in addition to the existing `granules` parameter. +- Burst InSAR validators now support multi-burst jobs. +- Replaced the step function's `INSPECT_MEMORY_REQUIREMENTS` step with a new `SET_BATCH_OVERRIDES` step, which calls a Lambda function to dynamically calculate [Batch container overrides](https://docs.aws.amazon.com/batch/latest/APIReference/API_ContainerOverrides.html) based on job type and parameters. + ## [7.9.3] ### Fixed - Added missing cloudformation:DeleteStack permission to cloudformation deployment role in ASF-deployment-ci-cf.yml . diff --git a/Makefile b/Makefile index c97f2c118..c76c9a847 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ API = ${PWD}/apps/api/src CHECK_PROCESSING_TIME = ${PWD}/apps/check-processing-time/src GET_FILES = ${PWD}/apps/get-files/src HANDLE_BATCH_EVENT = ${PWD}/apps/handle-batch-event/src +SET_BATCH_OVERRIDES = ${PWD}/apps/set-batch-overrides/src SCALE_CLUSTER = ${PWD}/apps/scale-cluster/src START_EXECUTION_MANAGER = ${PWD}/apps/start-execution-manager/src START_EXECUTION_WORKER = ${PWD}/apps/start-execution-worker/src @@ -9,7 +10,7 @@ DISABLE_PRIVATE_DNS = ${PWD}/apps/disable-private-dns/src UPDATE_DB = ${PWD}/apps/update-db/src UPLOAD_LOG = ${PWD}/apps/upload-log/src DYNAMO = ${PWD}/lib/dynamo -export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SCALE_CLUSTER}:${START_EXECUTION_MANAGER}:${START_EXECUTION_WORKER}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO} +export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION_MANAGER}:${START_EXECUTION_WORKER}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO} build: render @@ -44,7 +45,7 @@ render: static: flake8 openapi-validate cfn-lint flake8: - flake8 --ignore=E731 --max-line-length=120 --import-order-style=pycharm --statistics --application-import-names hyp3_api,get_files,handle_batch_event,check_processing_time,start_execution_manager,start_execution_worker,disable_private_dns,update_db,upload_log,dynamo,lambda_logging,scale_cluster apps tests lib + flake8 --ignore=E731 --max-line-length=120 --import-order-style=pycharm --statistics --application-import-names hyp3_api,get_files,handle_batch_event,set_batch_overrides,check_processing_time,start_execution_manager,start_execution_worker,disable_private_dns,update_db,upload_log,dynamo,lambda_logging,scale_cluster apps tests lib openapi-validate: render openapi-spec-validator apps/api/src/hyp3_api/api-spec/openapi-spec.yml diff --git a/apps/api/src/hyp3_api/util.py b/apps/api/src/hyp3_api/util.py index f5ecd6a88..78d2dd02b 100644 --- a/apps/api/src/hyp3_api/util.py +++ b/apps/api/src/hyp3_api/util.py @@ -9,7 +9,12 @@ class TokenDeserializeError(Exception): def get_granules(jobs: list[dict]) -> set[str]: - return {granule for job in jobs for granule in job['job_parameters'].get('granules', [])} + return { + granule + for key in ['granules', 'reference', 'secondary'] + for job in jobs + for granule in job['job_parameters'].get(key, []) + } def serialize(payload: dict): diff --git a/apps/api/src/hyp3_api/validation.py b/apps/api/src/hyp3_api/validation.py index f935219d5..f5e40aaf2 100644 --- a/apps/api/src/hyp3_api/validation.py +++ b/apps/api/src/hyp3_api/validation.py @@ -1,6 +1,7 @@ import json import os import sys +from copy import deepcopy from pathlib import Path import requests @@ -70,31 +71,47 @@ def check_granules_exist(granules, granule_metadata): raise GranuleValidationError(f'Some requested scenes could not be found: {", ".join(not_found_granules)}') -def check_dem_coverage(granule_metadata): +def check_dem_coverage(_, granule_metadata): bad_granules = [g['name'] for g in granule_metadata if not has_sufficient_coverage(g['polygon'])] if bad_granules: raise GranuleValidationError(f'Some requested scenes do not have DEM coverage: {", ".join(bad_granules)}') -def check_same_burst_ids(granule_metadata): - ref_burst_id, sec_burst_id = [granule['name'].split('_')[1] for granule in granule_metadata] - if ref_burst_id != sec_burst_id: +def check_same_burst_ids(job, _): + refs = job['job_parameters']['reference'] + secs = job['job_parameters']['secondary'] + ref_ids = ['_'.join(ref.split('_')[1:3]) for ref in refs] + sec_ids = ['_'.join(sec.split('_')[1:3]) for sec in secs] + if len(ref_ids) != len(sec_ids): raise GranuleValidationError( - f'The requested scenes do not have the same burst ID: {ref_burst_id} and {sec_burst_id}' + f'Number of reference and secondary scenes must match, got: ' + f'{len(ref_ids)} references and {len(sec_ids)} secondaries' + ) + for i in range(len(ref_ids)): + if ref_ids[i] != sec_ids[i]: + raise GranuleValidationError( + f'Burst IDs do not match for {refs[i]} and {secs[i]}.' + ) + if len(set(ref_ids)) != len(ref_ids): + duplicate_pair_id = next(ref_id for ref_id in ref_ids if ref_ids.count(ref_id) > 1) + raise GranuleValidationError( + f'The requested scenes have more than 1 pair with the following burst ID: {duplicate_pair_id}.' ) -def check_valid_polarizations(granule_metadata): - ref_polarization, sec_polarization = [granule['name'].split('_')[4] for granule in granule_metadata] - if ref_polarization != sec_polarization: +def check_valid_polarizations(job, _): + polarizations = set(granule.split('_')[4] for granule in get_granules([job])) + if len(polarizations) > 1: + raise GranuleValidationError( + f'The requested scenes need to have the same polarization, got: {", ".join(polarizations)}' + ) + if not polarizations.issubset({'VV', 'HH'}): raise GranuleValidationError( - f'The requested scenes do not have the same polarization: {ref_polarization} and {sec_polarization}' + f'Only VV and HH polarizations are currently supported, got: {polarizations.pop()}' ) - if ref_polarization != 'VV' and ref_polarization != 'HH': - raise GranuleValidationError(f'Only VV and HH polarizations are currently supported, got: {ref_polarization}') -def check_not_antimeridian(granule_metadata): +def check_not_antimeridian(_, granule_metadata): for granule in granule_metadata: bbox = granule['polygon'].bounds if abs(bbox[0] - bbox[2]) > 180.0 and bbox[0] * bbox[2] < 0.0: @@ -119,7 +136,20 @@ def get_multipolygon_from_geojson(input_file): return MultiPolygon(polygons) -def validate_jobs(jobs): +# TODO https://github.com/ASFHyP3/hyp3/issues/2442 remove this function after two burst types are merged +def convert_single_burst_jobs(jobs: list[dict]) -> list[dict]: + jobs = deepcopy(jobs) + for job in jobs: + if job['job_type'] == 'INSAR_ISCE_BURST': + job_parameters = job['job_parameters'] + ref, sec = job_parameters.pop('granules') + job_parameters['reference'], job_parameters['secondary'] = [ref], [sec] + return jobs + + +def validate_jobs(jobs: list[dict]) -> None: + jobs = convert_single_burst_jobs(jobs) + granules = get_granules(jobs) granule_metadata = get_cmr_metadata(granules) @@ -129,4 +159,4 @@ def validate_jobs(jobs): job_granule_metadata = [granule for granule in granule_metadata if granule['name'] in get_granules([job])] module = sys.modules[__name__] validator = getattr(module, validator_name) - validator(job_granule_metadata) + validator(job, job_granule_metadata) diff --git a/apps/set-batch-overrides/set-batch-overrides-cf.yml.j2 b/apps/set-batch-overrides/set-batch-overrides-cf.yml.j2 new file mode 100644 index 000000000..57362c9bf --- /dev/null +++ b/apps/set-batch-overrides/set-batch-overrides-cf.yml.j2 @@ -0,0 +1,76 @@ +AWSTemplateFormatVersion: 2010-09-09 + +{% if security_environment == 'EDC' %} +Parameters: + + SecurityGroupId: + Type: String + + SubnetIds: + Type: CommaDelimitedList +{% endif %} + +Outputs: + + LambdaArn: + Value: !GetAtt Lambda.Arn + +Resources: + + LogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: !Sub "/aws/lambda/${Lambda}" + RetentionInDays: 90 + + Role: + Type: {{ 'Custom::JplRole' if security_environment in ('JPL', 'JPL-public') else 'AWS::IAM::Role' }} + Properties: + {% if security_environment in ('JPL', 'JPL-public') %} + ServiceToken: !ImportValue Custom::JplRole::ServiceToken + Path: /account-managed/hyp3/ + {% endif %} + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + Action: sts:AssumeRole + Principal: + Service: lambda.amazonaws.com + Effect: Allow + ManagedPolicyArns: + - !Ref Policy + {% if security_environment == 'EDC' %} + - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole + {% endif %} + + Policy: + Type: {{ 'Custom::JplPolicy' if security_environment in ('JPL', 'JPL-public') else 'AWS::IAM::ManagedPolicy' }} + Properties: + {% if security_environment in ('JPL', 'JPL-public') %} + ServiceToken: !ImportValue Custom::JplPolicy::ServiceToken + Path: /account-managed/hyp3/ + {% endif %} + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: + - logs:CreateLogStream + - logs:PutLogEvents + Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*" + + Lambda: + Type: AWS::Lambda::Function + Properties: + Code: src/ + Handler: set_batch_overrides.lambda_handler + MemorySize: 128 + Role: !GetAtt Role.Arn + Runtime: python3.9 + Timeout: 30 + {% if security_environment == 'EDC' %} + VpcConfig: + SecurityGroupIds: + - !Ref SecurityGroupId + SubnetIds: !Ref SubnetIds + {% endif %} diff --git a/apps/set-batch-overrides/src/set_batch_overrides.py b/apps/set-batch-overrides/src/set_batch_overrides.py new file mode 100644 index 000000000..7d3dbd67e --- /dev/null +++ b/apps/set-batch-overrides/src/set_batch_overrides.py @@ -0,0 +1,86 @@ +AUTORIFT_S2_MEMORY = '7875' +AUTORIFT_LANDSAT_MEMORY = '15750' +RTC_GAMMA_10M_MEMORY = '63200' +WATER_MAP_10M_MEMORY = '126000' + +INSAR_ISCE_BURST_MEMORY_8G = '7500' +INSAR_ISCE_BURST_MEMORY_16G = '15500' +INSAR_ISCE_BURST_MEMORY_32G = '31500' +INSAR_ISCE_BURST_MEMORY_64G = '63500' +INSAR_ISCE_BURST_MEMORY_128G = '127500' + +# vCPU = Memory/8 for r6 instance types +INSAR_ISCE_BURST_OMP_NUM_THREADS = { + INSAR_ISCE_BURST_MEMORY_8G: '1', + INSAR_ISCE_BURST_MEMORY_16G: '2', + INSAR_ISCE_BURST_MEMORY_32G: '4', + INSAR_ISCE_BURST_MEMORY_64G: '8', + INSAR_ISCE_BURST_MEMORY_128G: '16', +} + + +def get_container_overrides(memory: str, omp_num_threads: str = None) -> dict: + container_overrides = { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': memory, + } + ] + } + if omp_num_threads is not None: + container_overrides['Environment'] = [{'Name': 'OMP_NUM_THREADS', 'Value': omp_num_threads}] + return container_overrides + + +def get_insar_isce_burst_memory(job_parameters: dict) -> str: + looks = job_parameters['looks'] + bursts = len(job_parameters['reference'].split(' ')) + if looks == '5x1': + if bursts < 2: + return INSAR_ISCE_BURST_MEMORY_8G + if bursts < 4: + return INSAR_ISCE_BURST_MEMORY_16G + if bursts < 11: + return INSAR_ISCE_BURST_MEMORY_32G + if bursts < 25: + return INSAR_ISCE_BURST_MEMORY_64G + if bursts < 31: + return INSAR_ISCE_BURST_MEMORY_128G + if looks == '10x2': + if bursts < 8: + return INSAR_ISCE_BURST_MEMORY_8G + if bursts < 21: + return INSAR_ISCE_BURST_MEMORY_16G + if bursts < 31: + return INSAR_ISCE_BURST_MEMORY_32G + if looks == '20x4': + if bursts < 23: + return INSAR_ISCE_BURST_MEMORY_8G + if bursts < 31: + return INSAR_ISCE_BURST_MEMORY_16G + raise ValueError(f'No memory value for {bursts} bursts and {looks} looks') + + +def lambda_handler(event: dict, _) -> dict: + job_type, job_parameters = event['job_type'], event['job_parameters'] + + # TODO https://github.com/ASFHyP3/hyp3/issues/2442 rename to INSAR_ISCE_BURST after the two burst types are merged + if job_type == 'INSAR_ISCE_MULTI_BURST': + memory = get_insar_isce_burst_memory(job_parameters) + omp_num_threads = INSAR_ISCE_BURST_OMP_NUM_THREADS[memory] + return get_container_overrides(memory, omp_num_threads) + + if job_type == 'AUTORIFT' and job_parameters['granules'].startswith('S2'): + return get_container_overrides(AUTORIFT_S2_MEMORY) + + if job_type == 'AUTORIFT' and job_parameters['granules'].startswith('L'): + return get_container_overrides(AUTORIFT_LANDSAT_MEMORY) + + if job_type == 'RTC_GAMMA' and job_parameters['resolution'] in ['10', '20']: + return get_container_overrides(RTC_GAMMA_10M_MEMORY) + + if job_type in ['WATER_MAP', 'WATER_MAP_EQ'] and job_parameters['resolution'] in ['10', '20']: + return get_container_overrides(WATER_MAP_10M_MEMORY) + + return {} diff --git a/apps/step-function.json.j2 b/apps/step-function.json.j2 index 68506f211..bc168208d 100644 --- a/apps/step-function.json.j2 +++ b/apps/step-function.json.j2 @@ -36,153 +36,32 @@ "Type": "Pass", "InputPath": "$.job_id", "ResultPath": "$.job_parameters.bucket_prefix", - "Next": "INSPECT_MEMORY_REQUIREMENTS" + "Next": "SET_BATCH_OVERRIDES" }, - "INSPECT_MEMORY_REQUIREMENTS": { - "Type": "Choice", - "Choices": [ - { - "And": [ - { - "Variable": "$.job_type", - "StringEquals": "AUTORIFT" - }, - { - "Variable": "$.job_parameters.granules", - "StringMatches": "S2*" - } - ], - "Next": "USE_SENTINEL2_MEMORY" - }, - { - "And": [ - { - "Variable": "$.job_type", - "StringEquals": "AUTORIFT" - }, - { - "Variable": "$.job_parameters.granules", - "StringMatches": "L*" - } - ], - "Next": "USE_LANDSAT_MEMORY" - }, + "SET_BATCH_OVERRIDES": { + "Type": "Task", + "Resource": "${SetBatchOverridesLambdaArn}", + "Parameters": { + "job_type.$": "$.job_type", + "job_parameters.$": "$.job_parameters" + }, + "Retry": [ { - "And": [ - { - "Variable": "$.job_type", - "StringEquals": "RTC_GAMMA" - }, - { - "Variable": "$.job_parameters.resolution", - "IsPresent": true - }, - { - "Or": [ - { - "Variable": "$.job_parameters.resolution", - "StringEquals": "10" - }, - { - "Variable": "$.job_parameters.resolution", - "StringEquals": "20" - } - ] - } + "ErrorEquals": [ + "States.ALL" ], - "Next": "USE_10M_RTC_MEMORY" - }, + "MaxAttempts": 2 + } + ], + "Catch": [ { - "And": [ - { - "Or": [ - { - "Variable": "$.job_type", - "StringEquals": "WATER_MAP" - }, - { - "Variable": "$.job_type", - "StringEquals": "WATER_MAP_EQ" - } - ] - }, - { - "Variable": "$.job_parameters.resolution", - "IsPresent": true - }, - { - "Or": [ - { - "Variable": "$.job_parameters.resolution", - "StringEquals": "10" - }, - { - "Variable": "$.job_parameters.resolution", - "StringEquals": "20" - } - ] - } + "ErrorEquals": [ + "States.ALL" ], - "Next": "USE_10M_WATER_MAP_MEMORY" + "Next": "JOB_FAILED", + "ResultPath": "$.container_overrides" } ], - "Default": "USE_DEFAULT_MEMORY" - }, - "USE_SENTINEL2_MEMORY": { - "Type": "Pass", - "Result": { - "ResourceRequirements": [ - { - "Type": "MEMORY", - "Value": "7875" - } - ] - }, - "ResultPath": "$.container_overrides", - "Next": "INSPECT_JOB_TYPE" - }, - "USE_LANDSAT_MEMORY": { - "Type": "Pass", - "Result": { - "ResourceRequirements": [ - { - "Type": "MEMORY", - "Value": "15750" - } - ] - }, - "ResultPath": "$.container_overrides", - "Next": "INSPECT_JOB_TYPE" - }, - "USE_10M_RTC_MEMORY": { - "Type": "Pass", - "Result": { - "ResourceRequirements": [ - { - "Type": "MEMORY", - "Value": "63200" - } - ] - }, - "ResultPath": "$.container_overrides", - "Next": "INSPECT_JOB_TYPE" - }, - "USE_10M_WATER_MAP_MEMORY": { - "Type": "Pass", - "Result": { - "ResourceRequirements": [ - { - "Type": "MEMORY", - "Value": "126000" - } - ] - }, - "ResultPath": "$.container_overrides", - "Next": "INSPECT_JOB_TYPE" - }, - "USE_DEFAULT_MEMORY": { - "Type": "Pass", - "Result": {}, "ResultPath": "$.container_overrides", "Next": "INSPECT_JOB_TYPE" }, diff --git a/apps/workflow-cf.yml.j2 b/apps/workflow-cf.yml.j2 index ec740395b..0859387c6 100644 --- a/apps/workflow-cf.yml.j2 +++ b/apps/workflow-cf.yml.j2 @@ -103,6 +103,7 @@ Resources: {% endfor %} {% endfor %} UpdateDBLambdaArn: !GetAtt UpdateDB.Outputs.LambdaArn + SetBatchOverridesLambdaArn: !GetAtt SetBatchOverrides.Outputs.LambdaArn GetFilesLambdaArn: !GetAtt GetFiles.Outputs.LambdaArn CheckProcessingTimeLambdaArn: !GetAtt CheckProcessingTime.Outputs.LambdaArn UploadLogLambdaArn: !GetAtt UploadLog.Outputs.LambdaArn @@ -162,6 +163,7 @@ Resources: Action: lambda:InvokeFunction Resource: - !GetAtt UpdateDB.Outputs.LambdaArn + - !GetAtt SetBatchOverrides.Outputs.LambdaArn - !GetAtt GetFiles.Outputs.LambdaArn - !GetAtt CheckProcessingTime.Outputs.LambdaArn - !GetAtt UploadLog.Outputs.LambdaArn @@ -177,6 +179,16 @@ Resources: {% endif %} TemplateURL: update-db/update-db-cf.yml + SetBatchOverrides: + Type: AWS::CloudFormation::Stack + Properties: + {% if security_environment == 'EDC' %} + Parameters: + SecurityGroupId: !Ref SecurityGroupId + SubnetIds: !Join [",", !Ref SubnetIds] + {% endif %} + TemplateURL: set-batch-overrides/set-batch-overrides-cf.yml + GetFiles: Type: AWS::CloudFormation::Stack Properties: diff --git a/job_spec/INSAR_ISCE_MULTI_BURST.yml b/job_spec/INSAR_ISCE_MULTI_BURST.yml new file mode 100644 index 000000000..e6b3a89e5 --- /dev/null +++ b/job_spec/INSAR_ISCE_MULTI_BURST.yml @@ -0,0 +1,88 @@ +INSAR_ISCE_MULTI_BURST: + required_parameters: + - reference + - secondary + parameters: + reference: + default: '""' + api_schema: + type: array + minItems: 1 + maxItems: 30 + # TODO: provide an example with multiple bursts + example: + - S1_136231_IW2_20200604T022312_VV_7C85-BURST + items: + description: Name of the reference Sentinel-1 SLC IW burst granule to process + type: string + pattern: '^S1_\d{6}_IW\d_\d{8}T\d{6}_[VH]{2}_([\dA-F]){4}-BURST$' + minLength: 43 + maxLength: 43 + example: S1_136231_IW2_20200604T022312_VV_7C85-BURST + secondary: + default: '""' + api_schema: + type: array + minItems: 1 + maxItems: 30 + example: + - S1_136231_IW2_20200616T022313_VV_5D11-BURST + items: + description: Name of the secondary Sentinel-1 SLC IW burst granule to process + type: string + pattern: '^S1_\d{6}_IW\d_\d{8}T\d{6}_[VH]{2}_([\dA-F]){4}-BURST$' + minLength: 43 + maxLength: 43 + example: S1_136231_IW2_20200616T022313_VV_5D11-BURST + bucket_prefix: + default: '""' + apply_water_mask: + api_schema: + description: Sets pixels over coastal and large inland waterbodies as invalid for phase unwrapping. + default: false + type: boolean + looks: + api_schema: + description: Number of looks to take in range and azimuth + type: string + default: 20x4 + enum: + - 20x4 + - 10x2 + - 5x1 + cost_profiles: + EDC: + cost: 1.0 + DEFAULT: + cost: 1.0 + validators: + - check_dem_coverage + - check_valid_polarizations + - check_same_burst_ids + - check_not_antimeridian + compute_environment: + name: 'Default' + tasks: + - name: '' + image: ghcr.io/asfhyp3/hyp3-isce2 + command: + - ++process + - insar_tops_burst + - --bucket + - '!Ref Bucket' + - --bucket-prefix + - Ref::bucket_prefix + - --apply-water-mask + - Ref::apply_water_mask + - --looks + - Ref::looks + - --reference + - Ref::reference + - --secondary + - Ref::secondary + timeout: 126000 # 35 hours + vcpu: 1 + memory: 4 # Memory is always overridden by the step function + secrets: + - EARTHDATA_USERNAME + - EARTHDATA_PASSWORD diff --git a/tests/test_api/test_util.py b/tests/test_api/test_util.py index 033911d8d..ba5b37bbb 100644 --- a/tests/test_api/test_util.py +++ b/tests/test_api/test_util.py @@ -13,11 +13,14 @@ def test_get_granules(): {'job_parameters': {'granules': ['A']}}, {'job_parameters': {'granules': ['B']}}, {'job_parameters': {'granules': ['C', 'D']}}, + {'job_parameters': {'secondary': ['J']}}, {'job_parameters': {'granules': ['C', 'D', 'E']}}, {'job_parameters': {'granules': ['F', 'F']}}, + {'job_parameters': {'reference': ['G', 'H', 'H', 'I']}}, + {'job_parameters': {'secondary': []}}, {'job_parameters': {}}, ] - ) == {'A', 'B', 'C', 'D', 'E', 'F'} + ) == {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J'} def test_serialize_token(): diff --git a/tests/test_api/test_validation.py b/tests/test_api/test_validation.py index 3ab648bae..d67fbfce3 100644 --- a/tests/test_api/test_validation.py +++ b/tests/test_api/test_validation.py @@ -18,11 +18,11 @@ def test_not_antimeridian(): bad_rect = rectangle(51.7, 51.3, 179.7, -179.3) good_granules = [{'polygon': good_rect, 'name': 'good'}, {'polygon': good_rect, 'name': 'good'}] - validation.check_not_antimeridian(good_granules) + validation.check_not_antimeridian({}, good_granules) bad_granules = [{'polygon': good_rect, 'name': 'good'}, {'polygon': bad_rect, 'name': 'bad'}] with raises(validation.GranuleValidationError, match=r'.*crosses the antimeridian.*'): - validation.check_not_antimeridian(bad_granules) + validation.check_not_antimeridian({}, bad_granules) def test_has_sufficient_coverage(): @@ -84,75 +84,218 @@ def test_check_dem_coverage(): covered2 = {'name': 'covered2', 'polygon': rectangle(-62, -90, 180, -180)} not_covered = {'name': 'not_covered', 'polygon': rectangle(-20, -30, 70, 100)} - validation.check_dem_coverage([]) - validation.check_dem_coverage([covered1]) - validation.check_dem_coverage([covered2]) - validation.check_dem_coverage([covered1, covered2]) + validation.check_dem_coverage({}, []) + validation.check_dem_coverage({}, [covered1]) + validation.check_dem_coverage({}, [covered2]) + validation.check_dem_coverage({}, [covered1, covered2]) with raises(validation.GranuleValidationError) as e: - validation.check_dem_coverage([not_covered]) + validation.check_dem_coverage({}, [not_covered]) assert 'not_covered' in str(e) with raises(validation.GranuleValidationError) as e: - validation.check_dem_coverage([covered1, not_covered]) + validation.check_dem_coverage({}, [covered1, not_covered]) assert 'not_covered' in str(e) assert 'covered1' not in str(e) def test_check_same_burst_ids(): - valid_case = [ + valid_jobs = [ { - 'name': 'S1_136231_IW2_20200604T022312_VV_7C85-BURST' + 'job_parameters': { + 'reference': ['S1_136231_IW2_20200604T022312_VV_7C85-BURST'], + 'secondary': ['S1_136231_IW2_20200616T022313_VV_5D11-BURST'] + } + }, + { + 'job_parameters': { + 'reference': [ + 'S1_136231_IW2_20200604T022312_VV_7C85-BURST', + 'S1_136232_IW2_20200616T022315_VV_5D11-BURST' + ], + 'secondary': [ + 'S1_136231_IW2_20200616T022313_VV_5411-BURST', + 'S1_136232_IW2_20200616T022345_VV_5D13-BURST' + ] + } }, { - 'name': 'S1_136231_IW2_20200616T022313_VV_5D11-BURST' + 'job_parameters': { + 'reference': [ + 'S1_136231_IW2_20200604T022312_VV_7C85-BURST', + 'S1_136231_IW3_20200616T022315_VV_5D11-BURST' + ], + 'secondary': [ + 'S1_136231_IW2_20200616T022313_VV_5411-BURST', + 'S1_136231_IW3_20200616T022345_VV_5D13-BURST' + ] + } } ] - invalid_case = [ + invalid_job_different_lengths = { + 'job_parameters': { + 'reference': ['S1_136231_IW2_20200604T022312_VV_7C85-BURST'], + 'secondary': [ + 'S1_136232_IW2_20200616T022313_VV_5D11-BURST', + 'S1_136233_IW2_20200616T022313_VV_5D11-BURST' + ] + } + } + invalid_jobs_not_matching = [ { - 'name': 'S1_136231_IW2_20200604T022312_VV_7C85-BURST' + 'job_parameters': { + 'reference': ['S1_136231_IW2_20200604T022312_VV_7C85-BURST'], + 'secondary': ['S1_136232_IW2_20200616T022313_VV_5D11-BURST'] + } }, { - 'name': 'S1_136232_IW2_20200616T022313_HH_5D11-BURST' + 'job_parameters': { + 'reference': [ + 'S1_136231_IW2_20200604T022312_VV_7C85-BURST', + 'S1_136232_IW2_20200604T123455_VV_ABC5-BURST' + ], + 'secondary': [ + 'S1_136231_IW2_20200617T022313_VV_5D11-BURST', + 'S1_136233_IW2_20200617T123213_VV_5E13-BURST' + ] + } + }, + { + 'job_parameters': { + 'reference': [ + 'S1_136232_IW2_20200604T022312_VV_7C85-BURST', + 'S1_136231_IW2_20200604T123455_VV_ABC5-BURST' + ], + 'secondary': [ + 'S1_136231_IW2_20200617T022313_VV_5D11-BURST', + 'S1_136233_IW2_20200617T123213_VV_5E13-BURST' + ] + } } ] - - validation.check_same_burst_ids(valid_case) - with raises(validation.GranuleValidationError, match=r'.*do not have the same burst ID.*'): - validation.check_same_burst_ids(invalid_case) + invalid_jobs_duplicate = [ + { + 'job_parameters': { + 'reference': [ + 'S1_136231_IW2_20200604T022312_VV_7C85-BURST', + 'S1_136231_IW2_20200604T123455_VV_ABC5-BURST' + ], + 'secondary': [ + 'S1_136231_IW2_20200617T022313_VV_5D11-BURST', + 'S1_136231_IW2_20200617T123213_VV_5E13-BURST' + ] + } + }, + { + 'job_parameters': { + 'reference': [ + 'S1_136231_IW2_20200604T022312_VV_7C85-BURST', + 'S1_136231_IW2_20200604T123455_VV_ABC5-BURST', + 'S1_136232_IW2_20200604T125455_VV_ABC6-BURST', + ], + 'secondary': [ + 'S1_136231_IW2_20200617T022313_VV_5D11-BURST', + 'S1_136231_IW2_20200617T123213_VV_5E13-BURST', + 'S1_136232_IW2_20200604T123475_VV_ABC7-BURST', + ] + } + } + ] + for valid_job in valid_jobs: + validation.check_same_burst_ids(valid_job, {}) + with raises(validation.GranuleValidationError, match=r'.*Number of reference and secondary scenes must match*'): + validation.check_same_burst_ids(invalid_job_different_lengths, {}) + for invalid_job in invalid_jobs_not_matching: + with raises(validation.GranuleValidationError, match=r'.*Burst IDs do not match*'): + validation.check_same_burst_ids(invalid_job, {}) + for invalid_job in invalid_jobs_duplicate: + with raises(validation.GranuleValidationError, match=r'.*The requested scenes have more than 1 pair*'): + validation.check_same_burst_ids(invalid_job, {}) def test_check_valid_polarizations(): - valid_case = [ + valid_jobs = [ { - 'name': 'S1_136231_IW2_20200604T022312_VV_7C85-BURST' + 'job_parameters': { + 'reference': ['S1_136231_IW2_20200604T022312_VV_7C85-BURST'], + 'secondary': ['S1_136231_IW2_20200616T022313_VV_5D11-BURST'] + } }, { - 'name': 'S1_136231_IW2_20200616T022313_VV_5D11-BURST' + 'job_parameters': { + 'reference': [ + 'S1_136231_IW2_20200604T022312_HH_7C85-BURST', + 'S1_136232_IW2_20200616T022315_HH_5D11-BURST' + ], + 'secondary': [ + 'S1_136231_IW2_20200616T022313_HH_5411-BURST', + 'S1_136232_IW2_20200616T022345_HH_5D13-BURST' + ] + } } ] - different_polarizations = [ + invalid_jobs_not_matching = [ { - 'name': 'S1_136231_IW2_20200604T022312_VV_7C85-BURST' + 'job_parameters': { + 'reference': ['S1_136231_IW2_20200604T022312_VV_7C85-BURST'], + 'secondary': ['S1_136232_IW2_20200616T022313_HH_5D11-BURST'] + } }, { - 'name': 'S1_136231_IW2_20200616T022313_HH_5D11-BURST' + 'job_parameters': { + 'reference': [ + 'S1_136231_IW2_20200604T022312_VV_7C85-BURST', + 'S1_136232_IW2_20200604T123455_VV_ABC5-BURST' + ], + 'secondary': [ + 'S1_136231_IW2_20200617T022313_VV_5D11-BURST', + 'S1_136233_IW2_20200617T123213_HH_5E13-BURST' + ] + } + }, + { + 'job_parameters': { + 'reference': [ + 'S1_136232_IW2_20200604T022312_VV_7C85-BURST', + 'S1_136231_IW2_20200604T123455_HH_ABC5-BURST' + ], + 'secondary': [ + 'S1_136231_IW2_20200617T022313_VV_5D11-BURST', + 'S1_136233_IW2_20200617T123213_HH_5E13-BURST' + ] + } } ] - unsupported_polarizations = [ + invalid_jobs_unsupported = [ { - 'name': 'S1_136231_IW2_20200604T022312_VH_7C85-BURST' + 'job_parameters': { + 'reference': ['S1_136231_IW2_20200604T022312_VH_7C85-BURST'], + 'secondary': ['S1_136231_IW2_20200617T022313_VH_5D11-BURST'] + } }, { - 'name': 'S1_136231_IW2_20200616T022313_VH_5D11-BURST' + 'job_parameters': { + 'reference': [ + 'S1_136231_IW2_20200604T022312_HV_7C85-BURST', + 'S1_136231_IW2_20200604T123455_HV_ABC5-BURST', + 'S1_136232_IW2_20200604T125455_HV_ABC6-BURST', + ], + 'secondary': [ + 'S1_136231_IW2_20200617T022313_HV_5D11-BURST', + 'S1_136231_IW2_20200617T123213_HV_5E13-BURST', + 'S1_136232_IW2_20200604T123475_HV_ABC7-BURST', + ] + } } ] - - validation.check_valid_polarizations(valid_case) - with raises(validation.GranuleValidationError, match=r'.*do not have the same polarization.*'): - validation.check_valid_polarizations(different_polarizations) - with raises(validation.GranuleValidationError, match=r'.*Only VV and HH.*'): - validation.check_valid_polarizations(unsupported_polarizations) + for valid_job in valid_jobs: + validation.check_valid_polarizations(valid_job, {}) + for invalid_job in invalid_jobs_not_matching: + with raises(validation.GranuleValidationError, match=r'.*need to have the same polarization*'): + validation.check_valid_polarizations(invalid_job, {}) + for invalid_job in invalid_jobs_unsupported: + with raises(validation.GranuleValidationError, match=r'.*currently supported*'): + validation.check_valid_polarizations(invalid_job, {}) def test_check_granules_exist(): @@ -230,6 +373,16 @@ def test_validate_jobs(): granule_with_dem_coverage = 'S1A_IW_SLC__1SSV_20150621T120220_20150621T120232_006471_008934_72D8' granule_without_dem_coverage = 'S1A_IW_GRDH_1SDV_20201219T222530_20201219T222555_035761_042F72_8378' + valid_burst_pair = ( + 'S1_136231_IW2_20200604T022312_VV_7C85-BURST', + 'S1_136231_IW2_20200616T022313_VV_5D11-BURST' + ) + + invalid_burst_pair = ( + 'S1_136231_IW2_20200616T022313_VV_5D11-BURST', + 'S1_136232_IW2_20200604T022315_VV_7C85-BURST' + ) + granule_polygon_pairs = [ (granule_with_dem_coverage, [['13.705972 -91.927132 14.452647 -91.773392 14.888498 -94.065727 ' @@ -270,6 +423,23 @@ def test_validate_jobs(): 'job_type': 'ARIA_RAIDER', 'job_parameters': {} }, + { + # TODO https://github.com/ASFHyP3/hyp3/issues/2442 + # rename to INSAR_ISCE_BURST after the two burst types are merged + 'job_type': 'INSAR_ISCE_MULTI_BURST', + 'job_parameters': { + 'reference': [valid_burst_pair[0]], + 'secondary': [valid_burst_pair[1]] + } + }, + { + # TODO https://github.com/ASFHyP3/hyp3/issues/2442 + # remove this test case after the two burst types are merged + 'job_type': 'INSAR_ISCE_BURST', + 'job_parameters': { + 'granules': [valid_burst_pair[0], valid_burst_pair[1]] + } + } ] validation.validate_jobs(jobs) @@ -294,3 +464,30 @@ def test_validate_jobs(): ] with raises(validation.GranuleValidationError): validation.validate_jobs(jobs) + + jobs = [ + { + # TODO https://github.com/ASFHyP3/hyp3/issues/2442 + # rename to INSAR_ISCE_BURST after the two burst types are merged + 'job_type': 'INSAR_ISCE_MULTI_BURST', + 'job_parameters': { + 'reference': [invalid_burst_pair[0]], + 'secondary': [invalid_burst_pair[1]] + } + } + ] + with raises(validation.GranuleValidationError): + validation.validate_jobs(jobs) + + # TODO https://github.com/ASFHyP3/hyp3/issues/2442 + # remove this test case after the two burst types are merged + jobs = [ + { + 'job_type': 'INSAR_ISCE_BURST', + 'job_parameters': { + 'granules': [invalid_burst_pair[0], invalid_burst_pair[1]] + } + } + ] + with raises(validation.GranuleValidationError): + validation.validate_jobs(jobs) diff --git a/tests/test_set_batch_overrides.py b/tests/test_set_batch_overrides.py new file mode 100644 index 000000000..6df1df591 --- /dev/null +++ b/tests/test_set_batch_overrides.py @@ -0,0 +1,276 @@ +import pytest + +from set_batch_overrides import ( + AUTORIFT_LANDSAT_MEMORY, + AUTORIFT_S2_MEMORY, + INSAR_ISCE_BURST_MEMORY_128G, + INSAR_ISCE_BURST_MEMORY_16G, + INSAR_ISCE_BURST_MEMORY_32G, + INSAR_ISCE_BURST_MEMORY_64G, + INSAR_ISCE_BURST_MEMORY_8G, + RTC_GAMMA_10M_MEMORY, + WATER_MAP_10M_MEMORY, + lambda_handler, +) + + +def mock_insar_isce_burst_job(looks: str, bursts: int) -> dict: + return { + # TODO https://github.com/ASFHyP3/hyp3/issues/2442 + # rename to INSAR_ISCE_BURST after the two burst types are merged + 'job_type': 'INSAR_ISCE_MULTI_BURST', + 'job_parameters': { + 'looks': looks, + 'reference': ' '.join('foo' for _ in range(bursts)), + } + } + + +def test_set_batch_overrides_default(): + assert lambda_handler( + { + 'job_type': 'foo', + 'job_parameters': {}, + }, + None, + ) == {} + + +def test_set_batch_overrides_insar_isce_burst_5x1(): + assert lambda_handler(mock_insar_isce_burst_job('5x1', bursts=1), None) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': INSAR_ISCE_BURST_MEMORY_8G, + } + ], + 'Environment': [{'Name': 'OMP_NUM_THREADS', 'Value': '1'}], + } + for n in range(2, 4): + assert lambda_handler(mock_insar_isce_burst_job('5x1', bursts=n), None) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': INSAR_ISCE_BURST_MEMORY_16G, + } + ], + 'Environment': [{'Name': 'OMP_NUM_THREADS', 'Value': '2'}], + } + for n in range(4, 11): + assert lambda_handler(mock_insar_isce_burst_job('5x1', bursts=n), None) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': INSAR_ISCE_BURST_MEMORY_32G, + } + ], + 'Environment': [{'Name': 'OMP_NUM_THREADS', 'Value': '4'}], + } + for n in range(11, 25): + assert lambda_handler(mock_insar_isce_burst_job('5x1', bursts=n), None) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': INSAR_ISCE_BURST_MEMORY_64G, + } + ], + 'Environment': [{'Name': 'OMP_NUM_THREADS', 'Value': '8'}], + } + for n in range(25, 31): + assert lambda_handler(mock_insar_isce_burst_job('5x1', bursts=n), None) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': INSAR_ISCE_BURST_MEMORY_128G, + } + ], + 'Environment': [{'Name': 'OMP_NUM_THREADS', 'Value': '16'}], + } + + +def test_set_batch_overrides_insar_isce_burst_10x2(): + for n in range(1, 8): + assert lambda_handler(mock_insar_isce_burst_job('10x2', bursts=n), None) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': INSAR_ISCE_BURST_MEMORY_8G, + } + ], + 'Environment': [{'Name': 'OMP_NUM_THREADS', 'Value': '1'}], + } + for n in range(8, 21): + assert lambda_handler(mock_insar_isce_burst_job('10x2', bursts=n), None) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': INSAR_ISCE_BURST_MEMORY_16G, + } + ], + 'Environment': [{'Name': 'OMP_NUM_THREADS', 'Value': '2'}], + } + for n in range(21, 31): + assert lambda_handler(mock_insar_isce_burst_job('10x2', bursts=n), None) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': INSAR_ISCE_BURST_MEMORY_32G, + } + ], + 'Environment': [{'Name': 'OMP_NUM_THREADS', 'Value': '4'}], + } + + +def test_set_batch_overrides_insar_isce_burst_20x4(): + for n in range(1, 23): + assert lambda_handler(mock_insar_isce_burst_job('20x4', bursts=n), None) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': INSAR_ISCE_BURST_MEMORY_8G, + } + ], + 'Environment': [{'Name': 'OMP_NUM_THREADS', 'Value': '1'}], + } + for n in range(23, 31): + assert lambda_handler(mock_insar_isce_burst_job('20x4', bursts=n), None) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': INSAR_ISCE_BURST_MEMORY_16G, + } + ], + 'Environment': [{'Name': 'OMP_NUM_THREADS', 'Value': '2'}], + } + + +def test_set_batch_overrides_insar_isce_burst_value_error(): + with pytest.raises(ValueError, match=r'^No memory value for.*'): + lambda_handler(mock_insar_isce_burst_job('20x4', bursts=31), None) + + with pytest.raises(ValueError, match=r'^No memory value for.*'): + lambda_handler(mock_insar_isce_burst_job('foo', bursts=1), None) + + +def test_set_batch_overrides_autorift_s2(): + assert lambda_handler( + { + 'job_type': 'AUTORIFT', + 'job_parameters': {'granules': 'S2B_'}, + }, + None, + ) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': AUTORIFT_S2_MEMORY, + } + ] + } + + +def test_set_batch_overrides_autorift_landsat(): + assert lambda_handler( + { + 'job_type': 'AUTORIFT', + 'job_parameters': {'granules': 'LC08_'}, + }, + None, + ) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': AUTORIFT_LANDSAT_MEMORY, + } + ] + } + + +def test_set_batch_overrides_rtc_gamma_10m(): + assert lambda_handler( + { + 'job_type': 'RTC_GAMMA', + 'job_parameters': {'resolution': '10'}, + }, + None, + ) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': RTC_GAMMA_10M_MEMORY, + } + ] + } + assert lambda_handler( + { + 'job_type': 'RTC_GAMMA', + 'job_parameters': {'resolution': '20'}, + }, + None, + ) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': RTC_GAMMA_10M_MEMORY, + } + ] + } + + +def test_set_batch_overrides_water_map_10m(): + assert lambda_handler( + { + 'job_type': 'WATER_MAP', + 'job_parameters': {'resolution': '10'}, + }, + None, + ) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': WATER_MAP_10M_MEMORY, + } + ] + } + assert lambda_handler( + { + 'job_type': 'WATER_MAP', + 'job_parameters': {'resolution': '20'}, + }, + None, + ) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': WATER_MAP_10M_MEMORY, + } + ] + } + assert lambda_handler( + { + 'job_type': 'WATER_MAP_EQ', + 'job_parameters': {'resolution': '10'}, + }, + None, + ) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': WATER_MAP_10M_MEMORY, + } + ] + } + assert lambda_handler( + { + 'job_type': 'WATER_MAP_EQ', + 'job_parameters': {'resolution': '20'}, + }, + None, + ) == { + 'ResourceRequirements': [ + { + 'Type': 'MEMORY', + 'Value': WATER_MAP_10M_MEMORY, + } + ] + }