Source code for eta_utility.simulators.fmu

"""The FMUSimulator class enables easy simulation of FMU files."""

from __future__ import annotations

import itertools as it
import shutil
from collections.abc import Mapping
from datetime import timedelta
from logging import getLogger
from typing import TYPE_CHECKING

import numpy as np
from fmpy import extract, read_model_description
from fmpy.fmi2 import FMU2Model, FMU2Slave
from fmpy.sundials import CVodeSolver
from fmpy.util import compile_platform_binary

if TYPE_CHECKING:
    from collections.abc import Sequence
    from typing import Any

    from fmpy.model_description import ModelDescription

    from eta_utility.type_hints import Number, Path, TimeStep

log = getLogger(__name__)


[docs] class FMUSimulator: """FMU simulator object. :param _id: FMU instance ID. :param fmu_path: Path to the FMU file. :param start_time: Simulation start time in seconds. :param float stop_time: Simulation stop time in seconds. :param step_size: Simulation step size in seconds. :param names_inputs: List of input names that correspond to names used in the FMU file (e.g. ['u', 'p']). If the step function is going to be used with lists as input values, this list will be used to translate between the list position and the variable name in the FMU. :param names_outputs: List of output names that correspond to names used in the FMU file (e.g. ['y', 'th', 'thdot']). If the step function should return only specific values instead of all results as a dictionary, this parameter can be specified to determine, which parameters should be returned. :param init_values: Starting values for parameters that should be pushed to the FMU with names corresponding to variables in the FMU. :param str return_type: "dict" or "list". Alter the standard behavior, which is to return lists from the step and get_values functions only if both, "names_inputs" and "names_outputs" are specified. This parameter will force the step and get_values functions to always return either dictionaries or lists. """ def __init__( self, _id: int, fmu_path: Path, start_time: TimeStep = 0, stop_time: TimeStep = 1, step_size: TimeStep = 1, names_inputs: Sequence[str] | None = None, names_outputs: Sequence[str] | None = None, init_values: Mapping[str, float] | None = None, *, return_type: str | None = None, ) -> None: #: Path to the FMU model. self.fmu_path = fmu_path #: Start time for the simulation in time increments. self.start_time = start_time.total_seconds() if isinstance(start_time, timedelta) else start_time #: Stopping time for the simulation in time increments (only relevant if run in simulation loop). self.stop_time = stop_time.total_seconds() if isinstance(stop_time, timedelta) else stop_time #: Step size (time) for the simulation in time increments. self.step_size = step_size.total_seconds() if isinstance(step_size, timedelta) else step_size #: Model description from the FMU (contains variable names, types, references and more). self.model_description: ModelDescription = read_model_description(fmu_path) #: Variable map from model description. The map specifies the value reference and datatype of a named #: variable in the FMU. The structure is {'name': {'ref': <value reference>, 'type': <variable data type>}}. self._model_vars: dict[str, dict[str, str]] = {} self.__type_map = {"Real": "real", "Boolean": "bool", "Integer": "int", "Enumeration": "enum"} for var in self.model_description.modelVariables: self._model_vars[var.name] = {"ref": var.valueReference, "type": self.__type_map[var.type]} #: Map of input variables which can be used to evaluate an ordered list of input variables. This is typically #: not required when working with mappings/dictionaries as step inputs. #: #: The map contains the following lists: #: #: * real: Mask for real variables. This can be used to identify real variables from the complete set of #: input variables (_inputs["refs"], see below) using `itertools.compress`. #: * int: Mask for integer variables. This can be used to identify integer variables from the complete set #: of input variables (_inputs["refs"], see below) using `itertools.compress`. #: * bool: Mask for boolean variables. This can be used to identify boolean variables from the complete set #: of input variables (_inputs["refs"], see below) using `itertools.compress`. self._input_map: dict[str, list[bool]] = {"names": [], "real": [], "int": [], "bool": []} #: Map of input variable references and their names. The map contains the following lists: #: #: * names: List of the named input variables that are accessible in the model. #: * refs: List of all value references to input variables of all types. This is the complete list, which #: can be filtered using itertools.compress (see above). #: * real: List of all value references to input variables of type real. #: * int: List of all value references to input variables of type integer. #: * bool: List of all value references to input variables of type boolean. self._inputs: dict[str, list[str]] = {"names": [], "refs": [], "real": [], "int": [], "bool": []} refs = [] names = [] iterator = names_inputs if names_inputs is not None else self._model_vars.keys() for var in iterator: if var in self._model_vars: refs.append(self._model_vars[var]["ref"]) names.append(var) self._input_map["real"].append(self._model_vars[var]["type"] == "real") self._input_map["int"].append(self._model_vars[var]["type"] == "int") self._input_map["bool"].append(self._model_vars[var]["type"] == "bool") else: log.warning( f"Input variable '{var}' couldn't be found in FMU model description. Entry will be ignored." ) self._inputs["names"] = names self._inputs["refs"] = refs self._inputs["real"] = list(it.compress(refs, self._input_map["real"])) self._inputs["int"] = list(it.compress(refs, self._input_map["int"])) self._inputs["bool"] = list(it.compress(refs, self._input_map["bool"])) #: Map of output variables which can be used to evaluate an ordered list of output variables. This is typically #: not required when working with mappings/dictionaries as step outputs. #: #: The map contains the following lists: #: #: * real: Mask for real variables. This can be used to identify real variables from the complete set of #: output variables (_outputs['refs'], see below) using `itertools.compress`. #: * int: Mask for integer variables. This can be used to identify integer variables from the complete set #: of output variables (_outputs['refs'], see below) using `itertools.compress`. #: * bool: Mask for boolean variables. This can be used to identify boolean variables from the complete set #: of output variables (_outputs['refs'], see below) using `itertools.compress`. self._output_map: dict[str, list[bool]] = {"names": [], "real": [], "int": [], "bool": []} #: Map of output variable references and their names. The map contains the following lists: #: #: * names: List of the named output variables that are accessible in the model. #: * refs: List of all value references to output variables of all types. This is the complete list, which #: can be filtered using itertools.compress (see above). #: * real: List of all value references to output variables of type real. #: * int: List of all value references to output variables of type integer. #: * bool: List of all value references to output variables of type boolean. self._outputs: dict[str, list[str]] = {"names": [], "refs": [], "real": [], "int": [], "bool": []} refs = [] names = [] iterator = names_outputs if names_outputs is not None else self._model_vars.keys() for var in iterator: if var in self._model_vars: refs.append(self._model_vars[var]["ref"]) names.append(var) self._output_map["real"].append(self._model_vars[var]["type"] == "real") self._output_map["int"].append(self._model_vars[var]["type"] == "int") self._output_map["bool"].append(self._model_vars[var]["type"] == "bool") else: log.warning( f"Output variable '{var}' couldn't be found in FMU model description. Entry will be ignored." ) self._outputs["names"] = names self._outputs["refs"] = refs self._outputs["real"] = list(it.compress(refs, self._output_map["real"])) self._outputs["int"] = list(it.compress(refs, self._output_map["int"])) self._outputs["bool"] = list(it.compress(refs, self._output_map["bool"])) #: Directory where the FMU will be extracted. self._unzipdir: Path = extract(fmu_path) try: #: Instance of the FMU Slave object. self.fmu: FMU2Slave = FMU2Slave( guid=self.model_description.guid, unzipDirectory=self._unzipdir, modelIdentifier=self.model_description.coSimulation.modelIdentifier, instanceName="FMUsimulator_" + str(_id), ) except Exception: compile_platform_binary(self.fmu_path) self.fmu = FMU2Slave( guid=self.model_description.guid, unzipDirectory=self._unzipdir, modelIdentifier=self.model_description.coSimulation.modelIdentifier, instanceName="FMUsimulator_" + str(_id), ) # initialize self.fmu.instantiate(visible=False, callbacks=None, loggingOn=False) self.fmu.setupExperiment(startTime=self.start_time) # set init values # instead of using the fmpy apply_start_values func from fmpy use the own set_values func to set the values # of the simulation variables correctly, reasons are also performance and simulation speed init_values = {} if init_values is None else init_values self.set_values(init_values) self.fmu.enterInitializationMode() self.fmu.exitInitializationMode() #: Current simulation time. self.time = self.start_time # Initialize some other parameters used to switch functionality of class methods. #: Return dictionaries from the step and get_values functions instead of lists. self._return_dict: bool = False if return_type is None: self._return_dict = names_inputs is None or names_outputs is None else: self._return_dict = return_type != "list" @property def input_vars(self) -> list[str]: """Ordered list of all available input variable names in the FMU.""" return self._inputs["names"].copy() @property def output_vars(self) -> list[str]: """Ordered list of all available output variable names in the FMU.""" return self._outputs["names"].copy()
[docs] def read_values(self, names: Sequence[str] | None = None) -> dict[str | int, Any] | list: """Return current values of the simulation without advancing a simulation step or the simulation time. :param names: Sequence of values to read from the FMU. If this is None (default), all available values will be read. """ # Find value references and names for the variables that should be read from the FMU if names is None: refs = self._outputs["refs"] vars_ = self._outputs["names"] else: refs = [] vars_ = [] for var in names: try: refs.append(self._model_vars[var]["ref"]) vars_.append(var) except KeyError as e: msg = f"Specified an output value for a variable which is not available in the FMU: {var}" raise KeyError(msg) from e # Get values from the FMU and convert to specified output format (dict or list) output_values = self.fmu.getReal(refs) return dict(zip(vars_, output_values)) if self._return_dict else output_values
[docs] def set_values(self, values: Sequence[Number | bool] | Mapping[str, Number | bool]) -> None: """Set values of simulation variables without advancing a simulation step or the simulation time. :param values: Values that should be pushed to the FMU. Names of the input_values must correspond to variables in the FMU. If passing as a Sequence, make sure the order corresponds to the order of the input_vars property. """ vals: dict[str, list[Number | bool]] = {"real": [], "int": [], "bool": []} refs: dict[str, list[str]] = {"real": [], "int": [], "bool": []} if isinstance(values, Mapping): for var, val in values.items(): try: refs[self._model_vars[var]["type"]].append(self._model_vars[var]["ref"]) vals[self._model_vars[var]["type"]].append(val) except KeyError as e: msg = f"Specified an input value for a variable which is not available in the FMU: {var}" raise KeyError(msg) from e else: if len(values) != len(self._inputs["refs"]): raise AttributeError( f"Length of value list ({len(values)}) must be equal to length of input_vars " f"property ({len(self._inputs['refs'])})" ) refs = { "real": self._inputs["real"], "int": self._inputs["int"], "bool": self._inputs["bool"], } vals = { "real": list(it.compress(values, self._input_map["real"])), "int": list(it.compress(values, self._input_map["int"])), "bool": list(it.compress(values, self._input_map["bool"])), } if len(refs["real"]) > 0: self.fmu.setReal(refs["real"], vals["real"]) if len(refs["int"]) > 0: self.fmu.setInteger(refs["int"], vals["int"]) if len(refs["bool"]) > 0: self.fmu.setBoolean(refs["bool"], vals["bool"])
[docs] def step( self, input_values: Sequence[Number | bool] | Mapping[str, Number | bool] | None = None, output_names: Sequence[str] | None = None, advance_time: bool = True, nr_substeps: int | None = None, ) -> dict[str | int, Any] | list[Any]: """Simulate next time step in the FMU with defined input values and output values. :param input_values: Current values that should be pushed to the FMU. Names of the input_values must correspond to variables in the FMU. If passing as a Sequence, make sure the order corresponds to the order of the input_vars property. :param advance_time: Decide if the FMUsimulator should add one timestep to the simulation time or not. This can be deactivated, if you just want to look at the result of a simulation step beforehand, without actually advancing simulation time. :param nr_substeps: if simulation steps are divided into substeps, this value will let the simulator know that no time violation warning is necessary. :return: Resulting input and output values from the FMU with the keys named corresponding to the variables in the FMU. """ if input_values is not None: self.set_values(input_values) # put out warning for time limit violation, if self.time + self.step_size > self.stop_time + full step size if self.time + self.step_size > self.stop_time + (int(nr_substeps) if nr_substeps else 1) * self.step_size: log.warning( f"Simulation time {self.time + self.step_size} s exceeds specified stop time of " f"{self.stop_time} s. Proceed with care, simulation may become inaccurate." ) # push input values to the FMU and do one timestep, doStep performs a step of certain size self.fmu.doStep(currentCommunicationPoint=self.time, communicationStepSize=self.step_size) # advance time if advance_time: self.time += self.step_size # advance the time return self.read_values(output_names)
[docs] @classmethod def simulate( cls, fmu_path: Path, start_time: TimeStep = 0, stop_time: TimeStep = 1, step_size: TimeStep = 1, init_values: Mapping[str, float] | None = None, ) -> np.ndarray: """Instantiate a simulator with the specified FMU, perform simulation and return results. :param fmu_path: Path to the FMU file. :param start_time: Simulation start time in seconds. :param float stop_time: Simulation stop time in seconds. :param step_size: simulation step size in seconds. :param init_values: Starting values for parameters that should be pushed to the FMU with names corresponding to variables in the FMU. """ simulator = cls(0, fmu_path, start_time, stop_time, step_size, init_values=init_values) dt = np.dtype([(name, float) for name in simulator.read_values()]) # mypy does not recognize the return type of floor division... result = np.rec.array( None, shape=((simulator.stop_time - simulator.start_time) // simulator.step_size + 1,), # type: ignore dtype=dt, ) assert result.dtype.names is not None, "There must be some output variables specified for the simulator." step = 0 while simulator.time <= simulator.stop_time: step_result = simulator.step() assert isinstance(step_result, dict), "The simulator needs a dictionary return." for name in result.dtype.names: result[step][name] = step_result[name] step += 1 return result
[docs] def reset(self, init_values: Mapping[str, float] | None = None) -> None: """Reset FMU to specified initial condition. :param init_values: Values for initialization. """ self.time = self.start_time self.fmu.reset() self.fmu.setupExperiment(startTime=self.start_time) # set init values # instead of using the fmpy apply_start_values func from fmpy use the own set_values func to set the values # of the simulation variables correctly, reasons are also performance and simulation speed self.set_values(init_values) # type: ignore self.fmu.enterInitializationMode() self.fmu.exitInitializationMode()
[docs] def close(self) -> None: """Close the FMU and tidy up the unzipped files.""" self.fmu.terminate() self.fmu.freeInstance() shutil.rmtree(self._unzipdir) # clean up unzipped files
[docs] class FMU2MESlave(FMU2Model): """Helper class for simulation of FMU2 FMUs. This is as wrapper for FMU2Model. It can be used to wrap model exchange FMUs such that they can be simulated similar to a co-simulation FMU. This is especially helpful for testing model exchange FMUs. It exposes an interface that emulates part of the original FMU2Slave class from fmpy. """ # Define some constants that might be needed according to the FMI Standard fmi2True: int = 1 # noqa: N815 fmi2False: int = 0 # noqa: N815 fmi2OK: int = 0 # noqa: N815 fmi2Warning: int = 1 # noqa: N815 fmi2Discard: int = 2 # noqa: N815 fmi2Error: int = 3 # noqa: N815 fmi2Fatal: int = 4 # noqa: N815 fmi2Pending: int = 5 # noqa: N815 def __init__(self, **kwargs: Any) -> None: r"""Initialize the FMU2Slave object. See also the fmyp documentation :py:class:`fmpy.fmi2.FMU2Model`. :param Any \**kwargs: Accepts any parameters that fmpy.FMU2Model accepts. """ super().__init__(**kwargs) self._model_description: ModelDescription = read_model_description(kwargs["unzipDirectory"]) self._solver: CVodeSolver self._tolerance: float = 0.0 self._stop_time: float = 0.0 self._start_time: float = 0.0
[docs] def setupExperiment( # noqa: N802 self, tolerance: float | None = None, startTime: float = 0.0, # noqa:N803 stopTime: float | None = None, # noqa:N803 **kwargs: Any, ) -> int: """Experiment setup and storage of required values. .. see also:: fmpy.fmi2.FMU2Model.setupExperiment :param tolerance: Solver tolerance, default value is 1e-5. :param startTime: Starting time for the experiment. :param stopTime: Ending time for the experiment. :param kwargs: Other keyword arguments that might be required for FMU2Model.setupExperiment in the future. :return: FMI2 return value. """ self._tolerance = 1e-5 if tolerance is None else tolerance self._stop_time = 0.0 if stopTime is None else stopTime self._start_time = startTime kwargs["tolerance"] = self._tolerance kwargs["stopTime"] = self._stop_time kwargs["startTime"] = self._start_time return super().setupExperiment(**kwargs)
[docs] def exitInitializationMode(self, **kwargs: Any) -> int: # noqa: N802 """Exit the initialization mode and set up the cvode solver. See also: :py:class:`fmpy.fmi2.FMU2Model.exitInitializationMode` :param kwargs: Keyword arguments accepted by FMU2Model.exitInitializationMode. :return: FMI2 return value. """ ret = super().exitInitializationMode(**kwargs) # Collect discrete states from FMU self.eventInfo.newDiscreteStatesNeeded = self.fmi2true self.eventInfo.terminateSimulation = self.fmi2false while ( self.eventInfo.newDiscreteStatesNeeded == self.fmi2true and self.eventInfo.terminateSimulation == self.fmi2false ): # update discrete states self.newDiscreteStates() self.enterContinuousTimeMode() # Initialize solver self._solver = CVodeSolver( set_time=self.setTime, startTime=self._start_time, maxStep=(self._stop_time - self._start_time) / 50.0, relativeTolerance=self._tolerance, nx=self._model_description.numberOfContinuousStates, nz=self._model_description.numberOfEventIndicators, get_x=self.getContinuousStates, set_x=self.setContinuousStates, get_dx=self.getDerivatives, get_z=self.getEventIndicators, ) return ret
[docs] def doStep( # noqa: N802 self, currentCommunicationPoint: float, # noqa: N803 communicationStepSize: float, # noqa: N803 noSetFMUStatePriorToCurrentPoint: int | None = None, # noqa: N803 ) -> int: """Perform a simulation step. Advance simulation from *currentCommunicationPoint* by *communicationStepSize*. Also refer to the FMI2 Standard documentation. :param currentCommunicationPoint: Current time stamp (starting point for simulation step). :param communicationStepSize: Time step size. :param noSetFMUStatePriorToCurrentPoint: Determine whether a reset before *currentCommunicationPoint* is possible. Must be either fmi2True or fmi2False. :return: FMU2 return value. """ time = currentCommunicationPoint step_size = communicationStepSize # Perform a solver step and reset the FMU Model time. _, time = self._solver.step(time, time + step_size) self.setTime(time) # Check for events that might have occurred during the step step_event, _ = self.completedIntegratorStep() return self.fmi2ok