Skip to content

Commit

Permalink
Remove ert_config from batch sim
Browse files Browse the repository at this point in the history
  • Loading branch information
frode-aarstad committed Nov 1, 2024
1 parent 87136c7 commit 1488d88
Show file tree
Hide file tree
Showing 23 changed files with 865 additions and 421 deletions.
2 changes: 1 addition & 1 deletion src/ert/cli/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def execute_workflow(
msg = "Workflow {} is not in the list of available workflows"
logger.error(msg.format(workflow_name))
return
runner = WorkflowRunner(workflow, storage, ert_config=ert_config)
runner = WorkflowRunner(workflow=workflow, storage=storage, ert_config=ert_config)
runner.run_blocking()
if not all(v["completed"] for v in runner.workflowReport().values()):
logger.error(f"Workflow {workflow_name} failed!")
317 changes: 159 additions & 158 deletions src/ert/config/ert_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,164 @@ def site_config_location() -> str:
)


def create_forward_model_json(
context: Substitutions,
forward_model_steps: List[ForwardModelStep],
run_id: Optional[str],
iens: int = 0,
itr: int = 0,
user_config_file: Optional[str] = "",
env_vars: Optional[Dict[str, str]] = None,
skip_pre_experiment_validation: bool = False,
) -> Dict[str, Any]:
if env_vars is None:
env_vars = {}

class Substituter:
def __init__(self, fm_step):
fm_step_args = ",".join(
[f"{key}={value}" for key, value in fm_step.private_args.items()]
)
fm_step_description = f"{fm_step.name}({fm_step_args})"
self.substitution_context_hint = (
f"parsing forward model step `FORWARD_MODEL {fm_step_description}` - "
"reconstructed, with defines applied during parsing"
)
self.copy_private_args = Substitutions()
for key, val in fm_step.private_args.items():
self.copy_private_args[key] = context.substitute_real_iter(
val, iens, itr
)

@overload
def substitute(self, string: str) -> str: ...

@overload
def substitute(self, string: None) -> None: ...

def substitute(self, string):
if string is None:
return string
string = self.copy_private_args.substitute(
string, self.substitution_context_hint, 1, warn_max_iter=False
)
return context.substitute_real_iter(string, iens, itr)

def filter_env_dict(self, d):
result = {}
for key, value in d.items():
new_key = self.substitute(key)
new_value = self.substitute(value)
if new_value is None:
result[new_key] = None
elif not (new_value[0] == "<" and new_value[-1] == ">"):
# Remove values containing "<XXX>". These are expected to be
# replaced by substitute, but were not.
result[new_key] = new_value
else:
logger.warning(
f"Environment variable {new_key} skipped due to"
f" unmatched define {new_value}",
)
# Its expected that empty dicts be replaced with "null"
# in jobs.json
if not result:
return None
return result

def handle_default(fm_step: ForwardModelStep, arg: str) -> str:
return fm_step.default_mapping.get(arg, arg)

for fm_step in forward_model_steps:
for key, val in fm_step.private_args.items():
if key in context and key != val and context[key] != val:
logger.info(
f"Private arg '{key}':'{val}' chosen over"
f" global '{context[key]}' in forward model step {fm_step.name}"
)
config_file_path = Path(user_config_file) if user_config_file is not None else None
config_path = str(config_file_path.parent) if config_file_path else ""
config_file = str(config_file_path.name) if config_file_path else ""

job_list_errors = []
job_list: List[ForwardModelStepJSON] = []
for idx, fm_step in enumerate(forward_model_steps):
substituter = Substituter(fm_step)
fm_step_json = {
"name": substituter.substitute(fm_step.name),
"executable": substituter.substitute(fm_step.executable),
"target_file": substituter.substitute(fm_step.target_file),
"error_file": substituter.substitute(fm_step.error_file),
"start_file": substituter.substitute(fm_step.start_file),
"stdout": (
substituter.substitute(fm_step.stdout_file) + f".{idx}"
if fm_step.stdout_file
else None
),
"stderr": (
substituter.substitute(fm_step.stderr_file) + f".{idx}"
if fm_step.stderr_file
else None
),
"stdin": substituter.substitute(fm_step.stdin_file),
"argList": [
handle_default(fm_step, substituter.substitute(arg))
for arg in fm_step.arglist
],
"environment": substituter.filter_env_dict(fm_step.environment),
"exec_env": substituter.filter_env_dict(fm_step.exec_env),
"max_running_minutes": fm_step.max_running_minutes,
}

try:
if not skip_pre_experiment_validation:
fm_step_json = fm_step.validate_pre_realization_run(fm_step_json)
except ForwardModelStepValidationError as exc:
job_list_errors.append(
ErrorInfo(
message=f"Validation failed for "
f"forward model step {fm_step.name}: {exc!s}"
).set_context(fm_step.name)
)

job_list.append(fm_step_json)

if job_list_errors:
raise ConfigValidationError.from_collected(job_list_errors)

return {
"global_environment": env_vars,
"config_path": config_path,
"config_file": config_file,
"jobList": job_list,
"run_id": run_id,
"ert_pid": str(os.getpid()),
}


def forward_model_data_to_json(
substitutions: Substitutions,
forward_model_steps: List[ForwardModelStep],
env_vars: Dict[str, str],
user_config_file: Optional[str] = "",
run_id: Optional[str] = None,
iens: int = 0,
itr: int = 0,
context_env: Optional[Dict[str, str]] = None,
):
if context_env is None:
context_env = {}
return create_forward_model_json(
context=substitutions,
forward_model_steps=forward_model_steps,
user_config_file=user_config_file,
env_vars={**env_vars, **context_env},
run_id=run_id,
iens=iens,
itr=itr,
)


@dataclass
class ErtConfig:
DEFAULT_ENSPATH: ClassVar[str] = "storage"
Expand Down Expand Up @@ -615,7 +773,7 @@ def _create_list_of_forward_model_steps_to_run(
for fm_step in fm_steps:
if fm_step.name in cls.PREINSTALLED_FORWARD_MODEL_STEPS:
try:
substituted_json = cls._create_forward_model_json(
substituted_json = create_forward_model_json(
run_id=None,
context=substitutions,
forward_model_steps=[fm_step],
Expand Down Expand Up @@ -644,163 +802,6 @@ def _create_list_of_forward_model_steps_to_run(
def forward_model_step_name_list(self) -> List[str]:
return [j.name for j in self.forward_model_steps]

def forward_model_data_to_json(
self,
run_id: Optional[str] = None,
iens: int = 0,
itr: int = 0,
context_env: Optional[Dict[str, str]] = None,
):
if context_env is None:
context_env = {}
return self._create_forward_model_json(
context=self.substitutions,
forward_model_steps=self.forward_model_steps,
user_config_file=self.user_config_file,
env_vars={**self.env_vars, **context_env},
run_id=run_id,
iens=iens,
itr=itr,
)

@classmethod
def _create_forward_model_json(
cls,
context: Substitutions,
forward_model_steps: List[ForwardModelStep],
run_id: Optional[str],
iens: int = 0,
itr: int = 0,
user_config_file: Optional[str] = "",
env_vars: Optional[Dict[str, str]] = None,
skip_pre_experiment_validation: bool = False,
) -> Dict[str, Any]:
if env_vars is None:
env_vars = {}

class Substituter:
def __init__(self, fm_step):
fm_step_args = ",".join(
[f"{key}={value}" for key, value in fm_step.private_args.items()]
)
fm_step_description = f"{fm_step.name}({fm_step_args})"
self.substitution_context_hint = (
f"parsing forward model step `FORWARD_MODEL {fm_step_description}` - "
"reconstructed, with defines applied during parsing"
)
self.copy_private_args = Substitutions()
for key, val in fm_step.private_args.items():
self.copy_private_args[key] = context.substitute_real_iter(
val, iens, itr
)

@overload
def substitute(self, string: str) -> str: ...

@overload
def substitute(self, string: None) -> None: ...

def substitute(self, string):
if string is None:
return string
string = self.copy_private_args.substitute(
string, self.substitution_context_hint, 1, warn_max_iter=False
)
return context.substitute_real_iter(string, iens, itr)

def filter_env_dict(self, d):
result = {}
for key, value in d.items():
new_key = self.substitute(key)
new_value = self.substitute(value)
if new_value is None:
result[new_key] = None
elif not (new_value[0] == "<" and new_value[-1] == ">"):
# Remove values containing "<XXX>". These are expected to be
# replaced by substitute, but were not.
result[new_key] = new_value
else:
logger.warning(
f"Environment variable {new_key} skipped due to"
f" unmatched define {new_value}",
)
# Its expected that empty dicts be replaced with "null"
# in jobs.json
if not result:
return None
return result

def handle_default(fm_step: ForwardModelStep, arg: str) -> str:
return fm_step.default_mapping.get(arg, arg)

for fm_step in forward_model_steps:
for key, val in fm_step.private_args.items():
if key in context and key != val and context[key] != val:
logger.info(
f"Private arg '{key}':'{val}' chosen over"
f" global '{context[key]}' in forward model step {fm_step.name}"
)
config_file_path = (
Path(user_config_file) if user_config_file is not None else None
)
config_path = str(config_file_path.parent) if config_file_path else ""
config_file = str(config_file_path.name) if config_file_path else ""

job_list_errors = []
job_list: List[ForwardModelStepJSON] = []
for idx, fm_step in enumerate(forward_model_steps):
substituter = Substituter(fm_step)
fm_step_json = {
"name": substituter.substitute(fm_step.name),
"executable": substituter.substitute(fm_step.executable),
"target_file": substituter.substitute(fm_step.target_file),
"error_file": substituter.substitute(fm_step.error_file),
"start_file": substituter.substitute(fm_step.start_file),
"stdout": (
substituter.substitute(fm_step.stdout_file) + f".{idx}"
if fm_step.stdout_file
else None
),
"stderr": (
substituter.substitute(fm_step.stderr_file) + f".{idx}"
if fm_step.stderr_file
else None
),
"stdin": substituter.substitute(fm_step.stdin_file),
"argList": [
handle_default(fm_step, substituter.substitute(arg))
for arg in fm_step.arglist
],
"environment": substituter.filter_env_dict(fm_step.environment),
"exec_env": substituter.filter_env_dict(fm_step.exec_env),
"max_running_minutes": fm_step.max_running_minutes,
}

try:
if not skip_pre_experiment_validation:
fm_step_json = fm_step.validate_pre_realization_run(fm_step_json)
except ForwardModelStepValidationError as exc:
job_list_errors.append(
ErrorInfo(
message=f"Validation failed for "
f"forward model step {fm_step.name}: {exc!s}"
).set_context(fm_step.name)
)

job_list.append(fm_step_json)

if job_list_errors:
raise ConfigValidationError.from_collected(job_list_errors)

return {
"global_environment": env_vars,
"config_path": config_path,
"config_file": config_file,
"jobList": job_list,
"run_id": run_id,
"ert_pid": str(os.getpid()),
}

@classmethod
def _workflows_from_dict(
cls,
Expand Down
Loading

0 comments on commit 1488d88

Please sign in to comment.