Skip to content

Commit

Permalink
Merge pull request #2465 from ASFHyP3/refactor-compute-envs
Browse files Browse the repository at this point in the history
Compute Envs Must be Imported to Render
  • Loading branch information
jtherrmann authored Oct 22, 2024
2 parents a77eddb + 631e8a6 commit d3afd83
Show file tree
Hide file tree
Showing 24 changed files with 182 additions and 131 deletions.
9 changes: 1 addition & 8 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Changes to custom compute environments:
- Custom compute environments are now applied to individual job steps rather than to entire jobs. The `compute_environment` field is now provided at the step level rather than at the top level of the job spec.
- Custom compute environments can optionally be defined within `job_spec/config/compute_environments.yml`. Job steps can import these environments using the following syntax:
```yaml
compute_environment:
import: MyComputeEnvironment
```
If the `import` value is `Default`, then the job step uses the deployment's default compute environment.

The `compute_environment` field can still be used to define a custom compute environment directly within the job spec, as before.
- If the value of the `compute_environment` field is `Default`, then the step uses the deployment's default compute environment. Otherwise, the value must be the name of a custom compute environment defined in `job_spec/config/compute_environments.yml`.
- Other changes to the job spec syntax:
- The `tasks` field has been renamed to `steps`.
- Job parameters no longer contain a top-level `default` field. The `default` field within each parameter's `api_schema` mapping is still supported.
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
API = ${PWD}/apps/api/src
APPS = ${PWD}/apps
CHECK_PROCESSING_TIME = ${PWD}/apps/check-processing-time/src
GET_FILES = ${PWD}/apps/get-files/src
HANDLE_BATCH_EVENT = ${PWD}/apps/handle-batch-event/src
Expand All @@ -10,7 +11,7 @@ DISABLE_PRIVATE_DNS = ${PWD}/apps/disable-private-dns/src
UPDATE_DB = ${PWD}/apps/update-db/src
UPLOAD_LOG = ${PWD}/apps/upload-log/src
DYNAMO = ${PWD}/lib/dynamo
export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION_MANAGER}:${START_EXECUTION_WORKER}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO}
export PYTHONPATH = ${API}:${CHECK_PROCESSING_TIME}:${GET_FILES}:${HANDLE_BATCH_EVENT}:${SET_BATCH_OVERRIDES}:${SCALE_CLUSTER}:${START_EXECUTION_MANAGER}:${START_EXECUTION_WORKER}:${DISABLE_PRIVATE_DNS}:${UPDATE_DB}:${UPLOAD_LOG}:${DYNAMO}:${APPS}


build: render
Expand Down
7 changes: 3 additions & 4 deletions apps/compute-cf.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ Outputs:
JobQueueArn:
Value: !Ref BatchJobQueue

{% for name in compute_env_names %}
{% for name in compute_envs %}
{{ name }}ComputeEnvironmentArn:
Value: !Ref {{ name }}ComputeEnvironment

{{ name }}JobQueueArn:
Value: !Ref {{ name }}JobQueue
{% endfor %}

{% endfor %}
TaskRoleArn:
Value: !GetAtt TaskRole.Arn

Expand Down Expand Up @@ -105,8 +105,7 @@ Resources:
SchedulingPolicy:
Type: AWS::Batch::SchedulingPolicy

{% for env in compute_envs %}
{% set name = env['name'] %}
{% for name, env in compute_envs.items() %}
{% set instance_types = env['instance_types'].split(',') if 'instance_types' in env else '!Ref InstanceTypes' %}
{% set ami_id = env['ami_id'] if 'ami_id' in env else '!Ref AmiId' %}
{% set type = env['allocation_type'] if 'allocation_type' in env else 'SPOT' %}
Expand Down
4 changes: 2 additions & 2 deletions apps/handle-batch-event/handle-batch-event-cf.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Parameters:
JobQueueArn:
Type: String

{% for name in compute_env_names %}
{% for name in compute_envs %}
{{ name }}JobQueueArn:
Type: String
{% endfor %}
Expand Down Expand Up @@ -100,7 +100,7 @@ Resources:
detail:
jobQueue:
- !Ref JobQueueArn
{% for name in compute_env_names %}
{% for name in compute_envs %}
- !Ref {{ name }}JobQueueArn
{% endfor %}
status:
Expand Down
6 changes: 3 additions & 3 deletions apps/main-cf.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ Resources:
Properties:
Parameters:
ComputeEnvironmentArn: !GetAtt Cluster.Outputs.ComputeEnvironmentArn
{% for name in compute_env_names %}
{% for name in compute_envs %}
{{ name }}ComputeEnvironmentArn: !GetAtt Cluster.Outputs.{{ name }}ComputeEnvironmentArn
{% endfor %}
DefaultMaxvCpus: !Ref DefaultMaxvCpus
Expand All @@ -172,7 +172,7 @@ Resources:
Properties:
Parameters:
JobQueueArn: !GetAtt Cluster.Outputs.JobQueueArn
{% for name in compute_env_names %}
{% for name in compute_envs %}
{{ name }}JobQueueArn: !GetAtt Cluster.Outputs.{{ name }}JobQueueArn
{% endfor %}
JobsTable: !Ref JobsTable
Expand All @@ -187,7 +187,7 @@ Resources:
Properties:
Parameters:
JobQueueArn: !GetAtt Cluster.Outputs.JobQueueArn
{% for name in compute_env_names %}
{% for name in compute_envs %}
{{ name }}JobQueueArn: !GetAtt Cluster.Outputs.{{ name }}JobQueueArn
{% endfor %}
TaskRoleArn: !GetAtt Cluster.Outputs.TaskRoleArn
Expand Down
97 changes: 33 additions & 64 deletions apps/render_cf.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import argparse
import json
from pathlib import Path
from typing import Optional

import jinja2
import yaml
Expand Down Expand Up @@ -53,7 +52,7 @@ def get_state_for_job_step(step: dict, index: int, next_state_name: str, job_spe


def get_map_state(job_spec: dict, step: dict) -> dict:
item, items = parse_job_step_map(step['map'])
item, items = parse_map_statement(step['map'])

batch_job_parameters = get_batch_job_parameters(job_spec, step, map_item=item)

Expand Down Expand Up @@ -86,11 +85,7 @@ def get_batch_submit_job_state(job_spec: dict, step: dict, filter_batch_params=F
batch_job_parameters = '$.batch_job_parameters'
parameters_key = 'Parameters.$'

if 'import' in step['compute_environment']:
compute_environment = step['compute_environment']['import']
else:
compute_environment = step['compute_environment']['name']

compute_environment = step['compute_environment']
job_queue = 'JobQueueArn' if compute_environment == 'Default' else compute_environment + 'JobQueueArn'
return {
'Type': 'Task',
Expand Down Expand Up @@ -129,24 +124,28 @@ def get_batch_submit_job_state(job_spec: dict, step: dict, filter_batch_params=F
}


def parse_job_step_map(step_map: str) -> tuple[str, str]:
tokens = step_map.split(' ')
assert len(tokens) == 4
assert tokens[0], tokens[2] == ('for', 'in')
def parse_map_statement(map_statement: str) -> tuple[str, str]:
tokens = map_statement.split(' ')
if len(tokens) != 4:
raise ValueError(f'expected 4 tokens in map statement but got {len(tokens)}: {map_statement}')
if tokens[0] != 'for':
raise ValueError(f"expected 'for', got '{tokens[0]}': {map_statement}")
if tokens[2] != 'in':
raise ValueError(f"expected 'in', got '{tokens[2]}': {map_statement}")
return tokens[1], tokens[3]


def get_batch_job_parameters(job_spec: dict, step: dict, map_item: str = None) -> dict:
job_params = ['bucket_prefix', *job_spec['parameters'].keys()]
job_params = {'bucket_prefix', *job_spec['parameters'].keys()}
step_params = get_batch_param_names_for_job_step(step)
batch_params = {
f'{param}.$': f'$.batch_job_parameters.{param}'
for param in job_params
if param in step_params
}
if map_item is not None:
assert map_item in step_params
batch_params[f'{map_item}.$'] = '$$.Map.Item.Value'
batch_params = {}
for param in step_params:
if param == map_item:
batch_params[f'{map_item}.$'] = '$$.Map.Item.Value'
else:
if param not in job_params:
raise ValueError(f"job parameter '{param}' has not been defined")
batch_params[f'{param}.$'] = f'$.batch_job_parameters.{param}'
return batch_params


Expand All @@ -159,7 +158,7 @@ def get_batch_param_names_for_job_step(step: dict) -> set[str]:
}


def render_templates(job_types, compute_envs, security_environment, api_name):
def render_templates(job_types: dict, compute_envs: dict, security_environment: str, api_name: str):
job_states = get_states_for_jobs(job_types)

env = jinja2.Environment(
Expand All @@ -178,7 +177,6 @@ def render_templates(job_types, compute_envs, security_environment, api_name):
output = template.render(
job_types=job_types,
compute_envs=compute_envs,
compute_env_names=[env['name'] for env in compute_envs],
security_environment=security_environment,
api_name=api_name,
json=json,
Expand All @@ -192,47 +190,18 @@ def render_templates(job_types, compute_envs, security_environment, api_name):
template_file.with_suffix('').write_text(output)


def get_compute_environments(job_types: dict, compute_env_file: Optional[Path]) -> list[dict]:
compute_envs = []
compute_env_names = set()
compute_env_imports = set()
for _, job_spec in job_types.items():
for step in job_spec['steps']:
compute_env = step['compute_environment']
if 'name' in compute_env:
name = compute_env['name']
assert name != 'Default'
if name in compute_env_names:
raise ValueError(
f'Compute envs must have unique names but the following is defined more than once: {name}.'
)
compute_envs.append(compute_env)
compute_env_names.add(name)
elif 'import' in compute_env and compute_env['import'] != 'Default':
compute_env_imports.add(compute_env['import'])
else:
assert compute_env['import'] == 'Default'

if compute_env_file:
compute_envs_from_file = yaml.safe_load(compute_env_file.read_text())['compute_environments']
for name in compute_envs_from_file:
assert name != 'Default'
if name in compute_env_names:
raise ValueError(
f'Compute envs must have unique names but the following is defined more than once: {name}.'
)
compute_env = compute_envs_from_file[name]
compute_env['name'] = name
compute_envs.append(compute_env)
compute_env_names.add(name)

for name in compute_env_imports:
if name not in compute_envs_from_file:
raise ValueError(
f'The following compute env is imported but not defined in the compute envs file: {name}.'
)
def get_compute_environments_for_deployment(job_types: dict, compute_env_file: Path) -> dict:
compute_envs = yaml.safe_load(compute_env_file.read_text())['compute_environments']

if 'Default' in compute_envs:
raise ValueError("'Default' is a reserved compute environment name")

return compute_envs
return {
step['compute_environment']: compute_envs[step['compute_environment']]
for job_spec in job_types.values()
for step in job_spec['steps']
if step['compute_environment'] != 'Default'
}


def render_batch_params_by_job_type(job_types: dict) -> None:
Expand Down Expand Up @@ -294,7 +263,7 @@ def validate_job_spec(job_type: str, job_spec: dict) -> None:
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-j', '--job-spec-files', required=True, nargs='+', type=Path)
parser.add_argument('-e', '--compute-environment-file', type=Path)
parser.add_argument('-e', '--compute-environment-file', required=True, type=Path)
parser.add_argument('-s', '--security-environment', default='ASF', choices=['ASF', 'EDC', 'JPL', 'JPL-public'])
parser.add_argument('-n', '--api-name', required=True)
parser.add_argument('-c', '--cost-profile', default='DEFAULT', choices=['DEFAULT', 'EDC'])
Expand All @@ -311,7 +280,7 @@ def main():
for step in job_spec['steps']:
step['name'] = job_type + '_' + step['name'] if step['name'] else job_type

compute_envs = get_compute_environments(job_types, args.compute_environment_file)
compute_envs = get_compute_environments_for_deployment(job_types, args.compute_environment_file)

render_batch_params_by_job_type(job_types)
render_default_params_by_job_type(job_types)
Expand Down
8 changes: 4 additions & 4 deletions apps/scale-cluster/scale-cluster-cf.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Parameters:
ComputeEnvironmentArn:
Type: String

{% for name in compute_env_names %}
{% for name in compute_envs %}
{{ name }}ComputeEnvironmentArn:
Type: String
{% endfor %}
Expand Down Expand Up @@ -86,7 +86,7 @@ Resources:
Action: batch:UpdateComputeEnvironment
Resource:
- !Ref ComputeEnvironmentArn
{% for name in compute_env_names %}
{% for name in compute_envs %}
- !Ref {{ name }}ComputeEnvironmentArn
{% endfor %}

Expand Down Expand Up @@ -127,7 +127,7 @@ Resources:
Targets:
- Arn: !GetAtt Lambda.Arn
Id: lambda
{% for name in compute_env_names %}
{% for name in compute_envs %}
- Arn: !GetAtt {{ name }}Lambda.Arn
Id: {{ name }}lambda
{% endfor %}
Expand All @@ -140,7 +140,7 @@ Resources:
Principal: events.amazonaws.com
SourceArn: !GetAtt Schedule.Arn

{% for name in compute_env_names %}
{% for name in compute_envs %}
{{ name }}LogGroup:
Type: AWS::Logs::LogGroup
Properties:
Expand Down
6 changes: 3 additions & 3 deletions apps/workflow-cf.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Parameters:
JobQueueArn:
Type: String

{% for name in compute_env_names %}
{% for name in compute_envs %}
{{ name }}JobQueueArn:
Type: String
{% endfor %}
Expand Down Expand Up @@ -89,7 +89,7 @@ Resources:
DefinitionS3Location: step-function.json
DefinitionSubstitutions:
JobQueueArn: !Ref JobQueueArn
{% for name in compute_env_names %}
{% for name in compute_envs %}
{{ name }}JobQueueArn: !Ref {{ name }}JobQueueArn
{% endfor %}
{% for job_type, job_spec in job_types.items() %}
Expand Down Expand Up @@ -134,7 +134,7 @@ Resources:
Action: batch:SubmitJob
Resource:
- !Ref JobQueueArn
{% for name in compute_env_names %}
{% for name in compute_envs %}
- !Ref {{ name }}JobQueueArn
{% endfor %}
{% for job_type, job_spec in job_types.items() %}
Expand Down
4 changes: 1 addition & 3 deletions job_spec/ARIA_AUTORIFT.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ AUTORIFT:
- ITS_LIVE_OD
- Ref::granules
timeout: 10800
compute_environment:
name: 'AriaAutorift'
instance_types: r6id.xlarge,r6id.2xlarge,r6id.4xlarge,r6id.8xlarge,r6idn.xlarge,r6idn.2xlarge,r6idn.4xlarge,r6idn.8xlarge
compute_environment: AriaAutorift
vcpu: 1
memory: 31500
secrets:
Expand Down
3 changes: 1 addition & 2 deletions job_spec/ARIA_RAIDER.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ ARIA_RAIDER:
- --input-bucket-prefix
- Ref::job_id
timeout: 10800
compute_environment:
import: 'Default'
compute_environment: Default
vcpu: 1
memory: 7500
secrets:
Expand Down
3 changes: 1 addition & 2 deletions job_spec/AUTORIFT.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ AUTORIFT:
- ITS_LIVE_OD
- Ref::granules
timeout: 10800
compute_environment:
import: 'Default'
compute_environment: Default
vcpu: 1
memory: 31500
secrets:
Expand Down
3 changes: 1 addition & 2 deletions job_spec/AUTORIFT_ITS_LIVE.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ AUTORIFT:
- ITS_LIVE_PROD
- Ref::granules
timeout: 10800
compute_environment:
import: 'Default'
compute_environment: Default
vcpu: 1
memory: 31500
secrets:
Expand Down
3 changes: 1 addition & 2 deletions job_spec/INSAR_GAMMA.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ INSAR_GAMMA:
- Ref::phase_filter_parameter
- Ref::granules
timeout: 10800
compute_environment:
import: 'Default'
compute_environment: Default
vcpu: 1
memory: 31500
secrets:
Expand Down
Loading

0 comments on commit d3afd83

Please sign in to comment.