Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated interface to get_variables_data #272

Merged
merged 15 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 47 additions & 38 deletions src/common/io.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (C) 2010-2021 Modelon AB
# Copyright (C) 2010-2024 Modelon AB
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
Expand Down Expand Up @@ -1208,6 +1208,7 @@ def __init__(self, fname, delayed_trajectory_loading = True, allow_file_updates=
self._is_stream = True
delayed_trajectory_loading = False
self._allow_file_updates = allow_file_updates
self._last_set_of_indices = (None, None) # used for dealing with cached data and partial trajectories

data_sections = ["name", "dataInfo", "data_2", "data_3", "data_4"]
if not self._is_stream:
Expand Down Expand Up @@ -1331,11 +1332,17 @@ def _get_name_dict(self):

return name_dict

def _can_use_partial_cache(self, start_index, stop_index):
""" Checks if start_index and stop_oindex are equal to the last cached indices. """
chria marked this conversation as resolved.
Show resolved Hide resolved
return self._allow_file_updates and (self._last_set_of_indices == (start_index, stop_index))

def _get_trajectory(self, data_index, start_index = 0, stop_index = None):
if isinstance(self._data_2, dict):
self._verify_file_data()

if data_index in self._data_2:
index_in_cache = data_index in self._data_2
partial_cache_ok = self._can_use_partial_cache(start_index, stop_index)
if (index_in_cache and not self._allow_file_updates) or (index_in_cache and partial_cache_ok):
return self._data_2[data_index]

file_position = self._data_2_info["file_position"]
Expand All @@ -1344,17 +1351,10 @@ def _get_trajectory(self, data_index, start_index = 0, stop_index = None):
nbr_variables = self._data_2_info["nbr_variables"]

# Account for sub-sets of data
if start_index > 0:
new_file_position = file_position + start_index*sizeof_type*nbr_variables
new_nbr_points = nbr_points - start_index
else:
new_file_position = file_position
new_nbr_points = nbr_points

if stop_index is not None and stop_index > 0:
new_nbr_points = stop_index
if start_index > 0:
new_nbr_points -= start_index
start_index = max(0, start_index)
stop_index = max(0, nbr_points if stop_index is None else min(nbr_points, stop_index))
chria marked this conversation as resolved.
Show resolved Hide resolved
new_file_position = file_position + start_index*sizeof_type*nbr_variables
new_nbr_points = stop_index - start_index

self._data_2[data_index] = fmi_util.read_trajectory(
encode(self._fname),
Expand Down Expand Up @@ -1554,32 +1554,35 @@ def get_variables_data(self,
names: list[str],
start_index: int = 0,
stop_index: Union[int, None] = None
) -> tuple[list[Trajectory], Union[int, None]]:
) -> tuple[dict[str, Trajectory], Union[int, None]]:
""""
Returns multiple trajectories, sliced to index range.
Note that start_index and stop_index behaves as indices for slicing, i.e. array[start_index:stop_index].
This also implies that stop_index = None or stop_index larger than the number of available data points
results in retrieving all the available data points from start_index, i.e. as the slice [start_index:].

Note that (start_index, stop_index) = (None, None) results in the slicing [None:None] which is equivalent to [:].

Returns trajectories for each variable in 'names' with lengths adjusted for the
interval [start_index, stop_index], i.e. partial trajectories.
Improper values for start_index and stop_index that are out of bounds are automatically corrected,
such that:
Negative values are always adjusted to 0 or larger.
Out of bounds for stop_index is adjusted for the number of available data points, as an example
if you set start_index = 0, stop_index = 5 but there are only 3 data points available,
then this function returns 3 data_points.
By default, start_index = 0 and stop_index = None, which implies that the full trajectory is returned.

Parameters::

names --
List of variables names for which to fetch trajectories.

start_index --
Starting index for trajectory slicing.
The index from where the trajectory data starts from.

stop_index --
Stopping index for trajectory slicing.
The index from where the trajectory data ends. If stop_index is set to None,
it implies that all data in the slice [start_index:] is returned.

Raises::
ValueError -- If stop_index < start_index.
ValueError -- If stop_index < start_index.

Returns::
Tuple: (List of trajectories, next start index (non-negative))
Tuple: (dict of trajectories with keys corresponding to variable names, next start index (non-negative))
"""

"""
Expand All @@ -1593,9 +1596,11 @@ def get_variables_data(self,
if isinstance(start_index, int) and isinstance(stop_index, int) and stop_index < start_index:
raise ValueError(f"Invalid values for {start_index=} and {stop_index=}, " + \
"'start_index' needs to be less than or equal to 'stop_index'.")
trajectories = []

# Get the time trajectory

trajectories = {}

# Get the corresponding time trajectory
if not self._contains_diagnostic_data:
time = self._get_trajectory(0, start_index, stop_index)
else:
Expand All @@ -1607,9 +1612,10 @@ def get_variables_data(self,
stop_index = len(time) + start_index

for name in names:
trajectories.append(self._get_variable_data_as_trajectory(name, time, start_index, stop_index))
trajectories[name] = self._get_variable_data_as_trajectory(name, time, start_index, stop_index)

new_start_index = start_index + len(time) if len(trajectories) > 0 else None
self._last_set_of_indices = (start_index, stop_index) # update them before we exit
chria marked this conversation as resolved.
Show resolved Hide resolved
return trajectories, new_start_index

def _calculate_events_and_steps(self, name):
Expand Down Expand Up @@ -1759,7 +1765,7 @@ def __init__(self, model, delimiter=";"):
super().__init__(model)
self.supports['result_max_size'] = True
self._first_point = True

def simulation_start(self):
"""
This method is called before the simulation has started and before
Expand Down Expand Up @@ -1811,13 +1817,13 @@ def integration_point(self, solver = None):
#Sets the parameters, if any
if solver and self.options["sensitivities"]:
self.param_sol += [np.array(solver.interpolate_sensitivity(model.time, 0)).flatten()]

max_size = self.options.get("result_max_size", None)
if max_size is not None:
current_size = sys.getsizeof(self.time_sol) + sys.getsizeof(self.real_sol) + \
sys.getsizeof(self.int_sol) + sys.getsizeof(self.bool_sol) + \
sys.getsizeof(self.param_sol)

verify_result_size(self._first_point, current_size, previous_size, max_size, self.options["ncp"], self.model.time)
self._first_point = False

Expand Down Expand Up @@ -2389,7 +2395,7 @@ def simulation_start(self):
self.real_var_ref = np.array(self.real_var_ref)
self.int_var_ref = np.array(self.int_var_ref)
self.bool_var_ref = np.array(self.bool_var_ref)

def _write(self, msg):
self._current_file_size = self._current_file_size+len(msg)
self._file.write(msg)
Expand Down Expand Up @@ -2521,6 +2527,9 @@ class ResultSizeError(JIOError):
Exception that is raised when a set maximum result size is exceeded.
"""

class InvalidIndexError(JIOError):
modelonrobinandersson marked this conversation as resolved.
Show resolved Hide resolved
""" Exception that is raised when indices for variable trajectories are invalid. """

def robust_float(value):
"""
Function for robust handling of float values such as INF and NAN.
Expand Down Expand Up @@ -2767,16 +2776,16 @@ def integration_point(self, solver = None):

def diagnostics_point(self, diag_data):
""" Generates a data point for diagnostics data by invoking the util function save_diagnostics_point. """
self.dump_data_internal.save_diagnostics_point(diag_data)
self.dump_data_internal.save_diagnostics_point(diag_data)
self.nbr_diag_points += 1
self._make_consistent(diag=True)

def _make_consistent(self, diag=False):
"""
This method makes sure that the result file is always consistent, meaning that it is
always possible to load the result file in the result class. The method makes the
always possible to load the result file in the result class. The method makes the
result file consistent by going back in the result file and updates the final time
as well as the number of result points in the file in specific locations of the
as well as the number of result points in the file in specific locations of the
result file. In the end, it puts the file pointer back to the end of the file (which
allows further writing of new result points)
"""
Expand Down Expand Up @@ -2841,8 +2850,8 @@ def verify_result_size(first_point, current_size, previous_size, max_size, ncp,
raise ResultSizeError(msg + "To change the maximum allowed result file size, please use the option 'result_max_size'")

if current_size > max_size:
raise ResultSizeError("Maximum size of the result reached (limit: %g GB) at time t=%g. "
"To change the maximum allowed result size, please use the option "
raise ResultSizeError("Maximum size of the result reached (limit: %g GB) at time t=%g. "
"To change the maximum allowed result size, please use the option "
"'result_max_size' or consider reducing the number of communication "
"points alternatively the number of variables to store result for."%(max_size/1024**3, time))

Expand Down Expand Up @@ -2873,7 +2882,7 @@ def get_result_handler(model, opts):
result_handler = ResultHandlerDummy(model)
else:
raise fmi.FMUException("Unknown option to result_handling.")

if (opts.get("result_max_size", 0) > 0) and not result_handler.supports["result_max_size"]:
logging_module.warning("The chosen result handler does not support limiting the result size. Ignoring option 'result_max_size'.")

Expand Down
Loading