Skip to content

Commit

Permalink
Updated interface to get_variables_data (#272)
Browse files Browse the repository at this point in the history
* Updated interface to get_variables_data

* Removed check for index values for now and only keep return type change

* Added fix to adjust indices if out of bounds and updated docstring

* Added fix for cached values affecting partial trajectories and added test

* Removed print-statements

* Added the fix for handling cache also for diagnostics variable

* Removed unused exception

* robuster solution for next start index

* Fixed issue with stop_index and added more tests

* Changed if len(traj) > 0 to simply if traj

* Fixed issue with start out of bounds

* Added another fix for stop_index>data points, updated docstring

* Fixed if-statement for stop_index and removed use of inf

* improved code and added test for next start index

* Changed for loop to another expression with better performance
  • Loading branch information
modelonrobinandersson authored Nov 14, 2024
1 parent dfe267e commit e271753
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 84 deletions.
132 changes: 81 additions & 51 deletions src/common/io.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright (C) 2010-2021 Modelon AB
# Copyright (C) 2010-2024 Modelon AB
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
Expand Down Expand Up @@ -1208,6 +1208,7 @@ def __init__(self, fname, delayed_trajectory_loading = True, allow_file_updates=
self._is_stream = True
delayed_trajectory_loading = False
self._allow_file_updates = allow_file_updates
self._last_set_of_indices = (None, None) # used for dealing with cached data and partial trajectories

data_sections = ["name", "dataInfo", "data_2", "data_3", "data_4"]
if not self._is_stream:
Expand Down Expand Up @@ -1331,11 +1332,17 @@ def _get_name_dict(self):

return name_dict

def _can_use_partial_cache(self, start_index: int, stop_index: Union[int, None]):
""" Checks if start_index and stop_index are equal to the last cached indices. """
return self._allow_file_updates and (self._last_set_of_indices == (start_index, stop_index))

def _get_trajectory(self, data_index, start_index = 0, stop_index = None):
if isinstance(self._data_2, dict):
self._verify_file_data()

if data_index in self._data_2:
index_in_cache = data_index in self._data_2
partial_cache_ok = self._can_use_partial_cache(start_index, stop_index)
if (index_in_cache and not self._allow_file_updates) or (index_in_cache and partial_cache_ok):
return self._data_2[data_index]

file_position = self._data_2_info["file_position"]
Expand All @@ -1344,17 +1351,13 @@ def _get_trajectory(self, data_index, start_index = 0, stop_index = None):
nbr_variables = self._data_2_info["nbr_variables"]

# Account for sub-sets of data
if start_index > 0:
new_file_position = file_position + start_index*sizeof_type*nbr_variables
new_nbr_points = nbr_points - start_index
else:
new_file_position = file_position
new_nbr_points = nbr_points

if stop_index is not None and stop_index > 0:
new_nbr_points = stop_index
if start_index > 0:
new_nbr_points -= start_index
start_index = max(0, start_index)
stop_index = max(0, nbr_points if stop_index is None else min(nbr_points, stop_index))
new_file_position = file_position + start_index*sizeof_type*nbr_variables
# Finally when stop_index = None, we can end up with start > stop,
# therefore we need to use min(start, stop)
start_index = min(start_index, stop_index)
new_nbr_points = stop_index - start_index

self._data_2[data_index] = fmi_util.read_trajectory(
encode(self._fname),
Expand All @@ -1373,10 +1376,12 @@ def _get_diagnostics_trajectory(self, data_index, start_index = 0, stop_index =
""" Returns trajectory for the diagnostics variable that corresponds to index 'data_index'. """
self._verify_file_data()

if data_index in self._data_3:
index_in_cache = data_index in self._data_3
partial_cache_ok = self._can_use_partial_cache(start_index, stop_index)
if (index_in_cache and not self._allow_file_updates) or (index_in_cache and partial_cache_ok):
return self._data_3[data_index]
self._data_3[data_index] = self._read_trajectory_data(data_index, True, start_index, stop_index)
return self._data_3[data_index][start_index:stop_index]
self._data_3[data_index] = self._read_trajectory_data(data_index, True, start_index, stop_index)[start_index:stop_index]
return self._data_3[data_index]

def _read_trajectory_data(self, data_index, read_diag_data, start_index = 0, stop_index = None):
""" Reads corresponding trajectory data for variable with index 'data_index',
Expand Down Expand Up @@ -1413,11 +1418,13 @@ def _read_trajectory_data(self, data_index, read_diag_data, start_index = 0, sto

return data

def _get_interpolated_trajectory(self, data_index: int, start_index: int = None, stop_index: int = None) -> Trajectory:
def _get_interpolated_trajectory(self, data_index: int, start_index: int = 0, stop_index: int = None) -> Trajectory:
""" Returns an interpolated trajectory for variable of corresponding index 'data_index'. """
self._verify_file_data()

if data_index in self._data_2:
index_in_cache = data_index in self._data_2
partial_cache_ok = self._can_use_partial_cache(start_index, stop_index)
if (index_in_cache and not self._allow_file_updates) or (index_in_cache and partial_cache_ok):
return self._data_2[data_index]

diag_time_vector = self._get_diagnostics_trajectory(0, start_index, stop_index)
Expand All @@ -1426,8 +1433,9 @@ def _get_interpolated_trajectory(self, data_index: int, start_index: int = None,

f = scipy.interpolate.interp1d(time_vector, data, fill_value="extrapolate")

# note that we dont need to slice here because diag_time_vector is already sliced accordingly
self._data_2[data_index] = f(diag_time_vector)
return self._data_2[data_index][start_index:stop_index]
return self._data_2[data_index]

def _get_description(self):
if not self._description:
Expand Down Expand Up @@ -1554,32 +1562,41 @@ def get_variables_data(self,
names: list[str],
start_index: int = 0,
stop_index: Union[int, None] = None
) -> tuple[list[Trajectory], Union[int, None]]:
) -> tuple[dict[str, Trajectory], Union[int, None]]:
""""
Returns multiple trajectories, sliced to index range.
Note that start_index and stop_index behaves as indices for slicing, i.e. array[start_index:stop_index].
This also implies that stop_index = None or stop_index larger than the number of available data points
results in retrieving all the available data points from start_index, i.e. as the slice [start_index:].
Note that (start_index, stop_index) = (None, None) results in the slicing [None:None] which is equivalent to [:].
Returns trajectories for each variable in 'names' with lengths adjusted for the
interval [start_index, stop_index], i.e. partial trajectories.
Improper values for start_index and stop_index that are out of bounds are automatically corrected,
such that:
Negative values are always adjusted to 0 or larger.
Out of bounds for stop_index is adjusted for the number of available data points, example:
If start_index = 0, stop_index = 5 and there are only 3 data points available,
then returned trajectories are of length 3.
If start_index is larger than or equal to the number of available data points, empty trajectories
are returned, i.e. trajectories of length 0.
Note that trajectories for parameters are always of length 2 if indices 0 and 1 are
part of the requested trajectory since they reflect the values of before and after initialization.
Therefore if you request a trajectory for a parameter with start_index>=2, returned trajectory is empty.
By default, start_index = 0 and stop_index = None, which implies that the full trajectory is returned.
Parameters::
names --
List of variables names for which to fetch trajectories.
start_index --
Starting index for trajectory slicing.
The index from where the trajectory data starts from.
stop_index --
Stopping index for trajectory slicing.
The index from where the trajectory data ends. If stop_index is set to None,
it implies that all data in the slice [start_index:] is returned.
Raises::
ValueError -- If stop_index < start_index.
ValueError -- If stop_index < start_index.
Returns::
Tuple: (List of trajectories, next start index (non-negative))
Tuple: (dict of trajectories with keys corresponding to variable names, next start index (non-negative))
"""

"""
Expand All @@ -1593,25 +1610,40 @@ def get_variables_data(self,
if isinstance(start_index, int) and isinstance(stop_index, int) and stop_index < start_index:
raise ValueError(f"Invalid values for {start_index=} and {stop_index=}, " + \
"'start_index' needs to be less than or equal to 'stop_index'.")
trajectories = []

# Get the time trajectory
trajectories = {}

# Get the corresponding time trajectory
if not self._contains_diagnostic_data:
time = self._get_trajectory(0, start_index, stop_index)
else:
# Since we interpolate data if diagnostics is enabled
time = self._get_diagnostics_trajectory(0, start_index, stop_index)

# Need to account for data that might be added while we are iterating over 'names' later
# If stop_index > number of data points, and data gets added while we are iterating
# then we might get trajectories of unequal lengths. Therefore ensure we set stop_index here accordingly.
if stop_index is None:
stop_index = len(time) + start_index
else:
stop_index = min(len(time) + start_index, stop_index)

for name in names:
trajectories.append(self._get_variable_data_as_trajectory(name, time, start_index, stop_index))
trajectories[name] = self._get_variable_data_as_trajectory(name, time, start_index, stop_index)

largest_trajectory_length = self._find_max_trajectory_length(trajectories)
new_start_index = (start_index + largest_trajectory_length) if trajectories else start_index

new_start_index = start_index + len(time) if len(trajectories) > 0 else None
self._last_set_of_indices = (start_index, stop_index) # update them before we exit
return trajectories, new_start_index

def _find_max_trajectory_length(self, trajectories):
"""
Given a dict of trajectories, find the length of the largest trajectory
among the set of continuous variables. We disregard parameters/constants since they are not stored
with the same amount of data points as trajectories for continuous variables.
"""
return max([0] + [len(t.x) for v, t in trajectories.items() if self.is_variable(v)])

def _calculate_events_and_steps(self, name):
if name in self._data_3:
return self._data_3[name]
Expand Down Expand Up @@ -1686,15 +1718,13 @@ def is_variable(self, name):
return True
elif '{}.'.format(DiagnosticsBase.calculated_diagnostics['nbr_state_limits_step']['name']) in name:
return True

variable_index = self.get_variable_index(name)
data_mat = self._dataInfo[0][variable_index]
if data_mat<1:
if data_mat < 1:
data_mat = 1

if data_mat == 1:
return False
else:
return True
return data_mat != 1

def is_negated(self, name):
"""
Expand Down Expand Up @@ -1759,7 +1789,7 @@ def __init__(self, model, delimiter=";"):
super().__init__(model)
self.supports['result_max_size'] = True
self._first_point = True

def simulation_start(self):
"""
This method is called before the simulation has started and before
Expand Down Expand Up @@ -1811,13 +1841,13 @@ def integration_point(self, solver = None):
#Sets the parameters, if any
if solver and self.options["sensitivities"]:
self.param_sol += [np.array(solver.interpolate_sensitivity(model.time, 0)).flatten()]

max_size = self.options.get("result_max_size", None)
if max_size is not None:
current_size = sys.getsizeof(self.time_sol) + sys.getsizeof(self.real_sol) + \
sys.getsizeof(self.int_sol) + sys.getsizeof(self.bool_sol) + \
sys.getsizeof(self.param_sol)

verify_result_size(self._first_point, current_size, previous_size, max_size, self.options["ncp"], self.model.time)
self._first_point = False

Expand Down Expand Up @@ -2389,7 +2419,7 @@ def simulation_start(self):
self.real_var_ref = np.array(self.real_var_ref)
self.int_var_ref = np.array(self.int_var_ref)
self.bool_var_ref = np.array(self.bool_var_ref)

def _write(self, msg):
self._current_file_size = self._current_file_size+len(msg)
self._file.write(msg)
Expand Down Expand Up @@ -2767,16 +2797,16 @@ def integration_point(self, solver = None):

def diagnostics_point(self, diag_data):
""" Generates a data point for diagnostics data by invoking the util function save_diagnostics_point. """
self.dump_data_internal.save_diagnostics_point(diag_data)
self.dump_data_internal.save_diagnostics_point(diag_data)
self.nbr_diag_points += 1
self._make_consistent(diag=True)

def _make_consistent(self, diag=False):
"""
This method makes sure that the result file is always consistent, meaning that it is
always possible to load the result file in the result class. The method makes the
always possible to load the result file in the result class. The method makes the
result file consistent by going back in the result file and updates the final time
as well as the number of result points in the file in specific locations of the
as well as the number of result points in the file in specific locations of the
result file. In the end, it puts the file pointer back to the end of the file (which
allows further writing of new result points)
"""
Expand Down Expand Up @@ -2841,8 +2871,8 @@ def verify_result_size(first_point, current_size, previous_size, max_size, ncp,
raise ResultSizeError(msg + "To change the maximum allowed result file size, please use the option 'result_max_size'")

if current_size > max_size:
raise ResultSizeError("Maximum size of the result reached (limit: %g GB) at time t=%g. "
"To change the maximum allowed result size, please use the option "
raise ResultSizeError("Maximum size of the result reached (limit: %g GB) at time t=%g. "
"To change the maximum allowed result size, please use the option "
"'result_max_size' or consider reducing the number of communication "
"points alternatively the number of variables to store result for."%(max_size/1024**3, time))

Expand Down Expand Up @@ -2873,7 +2903,7 @@ def get_result_handler(model, opts):
result_handler = ResultHandlerDummy(model)
else:
raise fmi.FMUException("Unknown option to result_handling.")

if (opts.get("result_max_size", 0) > 0) and not result_handler.supports["result_max_size"]:
logging_module.warning("The chosen result handler does not support limiting the result size. Ignoring option 'result_max_size'.")

Expand Down
14 changes: 7 additions & 7 deletions src/pyfmi/fmi.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ cdef class ModelBase:

if self._additional_logger:
self._additional_logger(module, log_level, message)

if self._max_log_size_msg_sent:
return

Expand Down Expand Up @@ -4583,7 +4583,7 @@ cdef class FMUModelBase2(ModelBase):

if nref == 0: ## get_string([])
return []

cdef FMIL.fmi2_string_t* output_value = <FMIL.fmi2_string_t*>FMIL.malloc(sizeof(FMIL.fmi2_string_t)*nref)

self._log_handler.capi_start_callback(self._max_log_size_msg_sent, self._current_log_size)
Expand Down Expand Up @@ -4977,7 +4977,7 @@ cdef class FMUModelBase2(ModelBase):
def set_debug_logging(self, logging_on, categories = []):
"""
Specifies if the debugging should be turned on or off and calls fmi2SetDebugLogging
for the specified categories, after checking they are valid.
for the specified categories, after checking they are valid.
Parameters::
Expand Down Expand Up @@ -5827,7 +5827,7 @@ cdef class FMUModelBase2(ModelBase):
relative_quantity = FMIL.fmi2_import_get_real_variable_relative_quantity(real_variable)

return relative_quantity == FMI2_TRUE

cpdef get_variable_unbounded(self, variable_name):
"""
Get the unbounded attribute of a real variable.
Expand Down Expand Up @@ -7853,7 +7853,7 @@ cdef class FMUModelME2(FMUModelBase2):
Deallocate memory allocated
"""
self._invoked_dealloc = 1

if self._initialized_fmu == 1:
FMIL.fmi2_import_terminate(self._fmu)

Expand Down Expand Up @@ -9067,8 +9067,8 @@ cdef class LogHandler:
pass

cdef class LogHandlerDefault(LogHandler):
"""Default LogHandler that uses checkpoints around FMI CAPI calls to
ensure logs are truncated at checkpoints. For FMUs generating XML during
"""Default LogHandler that uses checkpoints around FMI CAPI calls to
ensure logs are truncated at checkpoints. For FMUs generating XML during
CAPI calls, this ensures valid XML. """
def __init__(self, max_log_size):
super().__init__(max_log_size)
Expand Down
Loading

0 comments on commit e271753

Please sign in to comment.