Skip to content

Commit

Permalink
Time loading and writing during update
Browse files Browse the repository at this point in the history
  • Loading branch information
Feda Curic authored and dafeda committed Jun 20, 2023
1 parent 679060c commit 0007e8e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
3 changes: 2 additions & 1 deletion src/ert/_c_wrappers/analysis/analysis_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ def set_var(self, var_name: str, value: Union[float, int, bool, str]):

except ValueError:
raise ConfigValidationError(
f"Variable {var_name!r} with value {value!r} has incorrect type."
f"Variable {var_name!r} with value {value!r} has "
f"incorrect type."
f" Expected type {var['type'].__name__!r} but received"
f" value {value!r} of type {type(value).__name__!r}"
)
Expand Down
48 changes: 30 additions & 18 deletions src/ert/analysis/_es_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,13 @@ def analysis_ES(
iens_active_index = [i for i in range(len(ens_mask)) if ens_mask[i]]

progress_callback(Progress(Task("Loading data", 1, 3), None))
start = time.time()
temp_storage = _create_temporary_parameter_storage(
source_fs, ensemble_config, iens_active_index
)
end = time.time()
elapsed = end - start
print(f"Time to run _create_temporary_parameter_storage: {elapsed}")

ensemble_size = sum(ens_mask)
param_ensemble = _param_ensemble_for_projection(
Expand All @@ -413,9 +417,10 @@ def analysis_ES(

progress_callback(Progress(Task("Updating data", 2, 3), None))
for update_step in updatestep:
print("Loading responses and observations...")
start = time.time()
try:
print("Loading responses and observations...")
Y, (
S, (
observation_values,
observation_errors,
update_snapshot,
Expand All @@ -430,6 +435,9 @@ def analysis_ES(
)
except IndexError as e:
raise ErtAnalysisError(e) from e
end = time.time()
elapsed = end - start
print(f"Time to run _load_observations_and_responses: {elapsed}")

# pylint: disable=unsupported-assignment-operation
smoother_snapshot.update_step_snapshots[update_step.name] = update_snapshot
Expand All @@ -445,7 +453,7 @@ def analysis_ES(
noise = rng.standard_normal(size=(num_obs, ensemble_size))

if module.localization():
Y_prime = Y - Y.mean(axis=1, keepdims=True)
Y_prime = S - S.mean(axis=1, keepdims=True)
C_YY = Y_prime @ Y_prime.T / (ensemble_size - 1)
Sigma_Y = np.diag(np.sqrt(np.diag(C_YY)))
batch_size = 1000
Expand Down Expand Up @@ -476,14 +484,17 @@ def analysis_ES(
np.linalg.inv(Sigma_A) @ C_AY @ np.linalg.inv(Sigma_Y)
)
c_bool = c_AY > correlation_threshold
# Some parameters might be significantly correlated to the exact same
# responses, making up what we call a `parameter group``.
# Some parameters might be significantly correlated
# to the exact same responses,
# making up what we call a `parameter group``.
# We want to call the update only once per such parameter group
# to speed up computation.
param_groups = np.unique(c_bool, axis=0)

# Drop the parameter group that does not correlate to any responses.
row_with_all_false = np.all(param_groups == False, axis=1)
row_with_all_false = np.all(
param_groups == False, axis=1
) # noqa: E712, E501
param_groups = param_groups[~row_with_all_false]

for grp in param_groups:
Expand All @@ -494,11 +505,11 @@ def analysis_ES(
X_chunk = temp_storage[parameter.name][param_batch_idx, :][
row_indices, :
]
Y_chunk = Y[grp, :]
S_chunk = S[grp, :]
observation_errors_loc = observation_errors[grp]
observation_values_loc = observation_values[grp]
smoother.fit(
Y_chunk,
S_chunk,
observation_errors_loc,
observation_values_loc,
noise=noise[grp],
Expand All @@ -512,7 +523,7 @@ def analysis_ES(
else:
for parameter in update_step.parameters:
smoother.fit(
Y,
S,
observation_errors,
observation_values,
noise=noise,
Expand All @@ -529,18 +540,16 @@ def analysis_ES(
temp_storage[parameter.name]
)

A_with_rowscaling = _get_row_scaling_A_matrices(
if params_with_row_scaling := _get_params_with_row_scaling(
temp_storage, update_step.row_scaling_parameters
)

if A_with_rowscaling:
A_with_rowscaling = ensemble_smoother_update_step_row_scaling(
Y,
A_with_rowscaling,
):
params_with_row_scaling = ensemble_smoother_update_step_row_scaling(
S,
params_with_row_scaling,
observation_errors,
observation_values,
noise,
truncation,
module.get_truncation(),
ies.InversionType(module.inversion),
)
for parameter, (A, _) in zip(
Expand All @@ -552,6 +561,9 @@ def analysis_ES(
_save_temp_storage_to_disk(
target_fs, ensemble_config, temp_storage, iens_active_index
)
end = time.time()
elapsed = end - start
print(f"Time to run _save_temporary_storage_to_disk: {elapsed}")


def analysis_IES(
Expand Down Expand Up @@ -611,7 +623,7 @@ def analysis_IES(
f"No active observations for update step: {update_step.name}."
)

noise = rng.standard_normal(size=(len(observation_values), S.shape[1]))
noise = rng.standard_normal(size=(len(observation_values), Y.shape[1]))
for parameter in update_step.parameters:
iterative_ensemble_smoother.fit(
Y,
Expand Down

0 comments on commit 0007e8e

Please sign in to comment.